SUM3-Trading-Terminal / mBA-GMP.v3 /generate_profiles.py
algorembrant's picture
Upload 37 files
e15ab27 verified
#!/usr/bin/env python3
"""
generate_profiles.py β€” mBA-GMP.v3 Dataframe & Chart Generator
==============================================================
Produces CSV files and publication-quality PNG charts demonstrating the
Conventional Market Profile (CMP), Gap-filled Market Profile (GMP),
and Up/Down-Bin Footprint Profile using a 10-datapoint XAUUSD example.
Outputs:
CSV: datapoints.csv, cmp_profile.csv, gmp_profile.csv,
updown_profile.csv
PNG: fig_price_scatter.png, fig_cmp_profile.png,
fig_gmp_profile.png, fig_cmp_vs_gmp.png,
fig_updown_footprint.png
"""
import math
import csv
import os
# ── Try to import optional plotting libs ─────────────────────────────────────
try:
import matplotlib
matplotlib.use("Agg") # non-interactive backend
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
HAS_MPL = True
except ImportError:
HAS_MPL = False
print("[WARN] matplotlib not found – CSV files will still be generated "
"but PNG charts will be skipped.")
# ══════════════════════════════════════════════════════════════════════════════
# 1. RAW DATAPOINTS
# ══════════════════════════════════════════════════════════════════════════════
DATAPOINTS = [
("A", 1, 3000.914),
("B", 2, 3003.837),
("C", 3, 3002.432),
("D", 4, 3009.892),
("E", 5, 3007.698),
("F", 6, 3009.176),
("G", 7, 3003.381),
("H", 8, 3004.283),
("I", 9, 3003.512),
("J", 10, 3003.012),
]
BIN_SIZE = 1 # Ξ² = 1 symbol price unit
# ══════════════════════════════════════════════════════════════════════════════
# 2. HELPER FUNCTIONS
# ══════════════════════════════════════════════════════════════════════════════
def bin_index(price: float, beta: float = BIN_SIZE) -> int:
"""Return the bin index for a given price: floor(price / Ξ²)."""
return int(math.floor(price / beta))
def bin_range(price: float, beta: float = BIN_SIZE):
"""Return (price_from, price_until) for the bin containing *price*."""
b = bin_index(price, beta)
return b * beta, (b + 1) * beta
def make_bin_key(b: int, beta: float = BIN_SIZE):
"""Return (bin_number_1based, price_from, price_until) for bin index *b*."""
return (b * beta, (b + 1) * beta)
# ══════════════════════════════════════════════════════════════════════════════
# 3. CMP CONSTRUCTION
# ══════════════════════════════════════════════════════════════════════════════
def build_cmp(datapoints, beta=BIN_SIZE):
"""
Build CMP profile.
Returns dict: bin_index -> {"labels": [str], "count": int}
"""
profile = {}
for label, _trade, price in datapoints:
b = bin_index(price, beta)
if b not in profile:
profile[b] = {"labels": [], "count": 0}
profile[b]["labels"].append(label)
profile[b]["count"] += 1
return profile
# ══════════════════════════════════════════════════════════════════════════════
# 4. GMP CONSTRUCTION
# ══════════════════════════════════════════════════════════════════════════════
def build_gmp(datapoints, beta=BIN_SIZE):
"""
Build GMP profile (gap-filled).
Convention (matches the dataframe approach):
1. Every datapoint fills its OWN bin with its own label (same as CMP).
2. For each consecutive pair (i, i+1), the intermediate bins BETWEEN
b(p_i) and b(p_{i+1}) β€” exclusive of both endpoints β€” are filled
with the SOURCE datapoint's label (datapoint i).
Returns dict: bin_index -> {"labels": [str], "count": int}
"""
profile = {}
def add_to_bin(b, label):
if b not in profile:
profile[b] = {"labels": [], "count": 0}
profile[b]["labels"].append(label)
profile[b]["count"] += 1
# ── Step 1: CMP-style placement β€” each datapoint fills its own bin ──
for label, _trade, price in datapoints:
add_to_bin(bin_index(price, beta), label)
# ── Step 2: Gap-fill intermediate bins between consecutive pairs ─────
for idx in range(len(datapoints) - 1):
src_label, _, src_price = datapoints[idx]
_dst_label, _, dst_price = datapoints[idx + 1]
b_from = bin_index(src_price, beta)
b_to = bin_index(dst_price, beta)
if abs(b_to - b_from) <= 1:
# Adjacent or same bin β€” no intermediate bins to fill
continue
direction = 1 if b_to > b_from else -1
# Fill bins strictly BETWEEN b_from and b_to (exclusive of both)
b = b_from + direction
while b != b_to:
add_to_bin(b, src_label)
b += direction
return profile
# ══════════════════════════════════════════════════════════════════════════════
# 4b. UP/DOWN-BIN FOOTPRINT PROFILE CONSTRUCTION
# ══════════════════════════════════════════════════════════════════════════════
def build_updown_profile(datapoints, beta=BIN_SIZE):
"""
Build the Up/Down-Bin Footprint Profile.
For each consecutive pair of datapoints, every bin on the gap-filled
path (excluding the source datapoint's own bin) is classified as an
up-bin or down-bin depending on the direction of the move.
The first datapoint (no prior movement) receives 0 up / 0 down.
Returns dict: bin_index -> {"labels": [str], "up": int, "down": int}
"""
# ── Collect GMP group labels (reuse from GMP logic) ──────────────────
groups = {} # bin_index -> list of labels
def add_label(b, label):
if b not in groups:
groups[b] = []
groups[b].append(label)
# CMP placement
for label, _trade, price in datapoints:
add_label(bin_index(price, beta), label)
# Gap-fill intermediate labels
for idx in range(len(datapoints) - 1):
src_label, _, src_price = datapoints[idx]
_, _, dst_price = datapoints[idx + 1]
b_from = bin_index(src_price, beta)
b_to = bin_index(dst_price, beta)
if abs(b_to - b_from) <= 1:
continue
direction = 1 if b_to > b_from else -1
b = b_from + direction
while b != b_to:
add_label(b, src_label)
b += direction
# ── Now compute up/down counts per bin ────────────────────────────────
up_counts = {} # bin_index -> int
down_counts = {} # bin_index -> int
for idx in range(len(datapoints) - 1):
_, _, src_price = datapoints[idx]
_, _, dst_price = datapoints[idx + 1]
b_from = bin_index(src_price, beta)
b_to = bin_index(dst_price, beta)
if b_from == b_to:
# Same bin, but price might have moved
if dst_price > src_price:
up_counts[b_from] = up_counts.get(b_from, 0) + 1
elif dst_price < src_price:
down_counts[b_from] = down_counts.get(b_from, 0) + 1
continue
is_up = b_to > b_from
direction = 1 if is_up else -1
# Every bin on the path AFTER the source bin (exclusive of source,
# inclusive of destination) gets a directional count.
b = b_from + direction
while True:
if is_up:
up_counts[b] = up_counts.get(b, 0) + 1
else:
down_counts[b] = down_counts.get(b, 0) + 1
if b == b_to:
break
b += direction
# ── Merge into result dict ───────────────────────────────────────────
all_bins = set(groups.keys()) | set(up_counts.keys()) | set(down_counts.keys())
profile = {}
for b in all_bins:
profile[b] = {
"labels": sorted(groups.get(b, [])),
"up": up_counts.get(b, 0),
"down": down_counts.get(b, 0),
}
return profile
# ══════════════════════════════════════════════════════════════════════════════
# 5. CSV OUTPUT
# ══════════════════════════════════════════════════════════════════════════════
def write_datapoints_csv(datapoints, path="datapoints.csv"):
"""Write the raw datapoints to CSV."""
with open(path, "w", newline="") as f:
w = csv.writer(f)
w.writerow(["datapoint", "x-axis trades (raw trades or time)", "y-axis Price"])
for label, trade, price in datapoints:
w.writerow([label, trade, f"{price:.3f}"])
print(f"[OK] {path}")
def write_profile_csv(profile, beta, path):
"""Write a profile (CMP or GMP) to CSV, bins numbered 1..N from lowest."""
if not profile:
print(f"[WARN] Empty profile, skipping {path}")
return
b_min = min(profile.keys())
b_max = max(profile.keys())
# Include ALL bins from b_min to b_max (even empty ones)
rows = []
bin_number = 1
for b in range(b_min, b_max + 1):
p_from = b * beta
p_until = (b + 1) * beta
info = profile.get(b, {"labels": [], "count": 0})
group = "".join(sorted(info["labels"]))
count = info["count"]
rows.append([bin_number, int(p_from), int(p_until), group, count])
bin_number += 1
with open(path, "w", newline="") as f:
w = csv.writer(f)
w.writerow([
f"bin (with binsize = {beta} symbol's price unit)",
"price from", "price until", "datapoint group",
"number of profile's stacks"
])
for row in rows:
w.writerow(row)
print(f"[OK] {path}")
def write_updown_profile_csv(updown_profile, gmp_groups, beta, path):
"""Write the Up/Down-Bin Footprint Profile to CSV."""
if not updown_profile:
print(f"[WARN] Empty profile, skipping {path}")
return
b_min = min(updown_profile.keys())
b_max = max(updown_profile.keys())
rows = []
bin_number = 1
for b in range(b_min, b_max + 1):
p_from = b * beta
p_until = (b + 1) * beta
info = updown_profile.get(b, {"labels": [], "up": 0, "down": 0})
group = "".join(info["labels"])
up_val = info["up"]
down_val = info["down"]
delta_val = up_val - down_val
rows.append([bin_number, int(p_from), int(p_until), group,
down_val, up_val, delta_val])
bin_number += 1
with open(path, "w", newline="") as f:
w = csv.writer(f)
w.writerow([
f"bin (with binsize = {beta} symbol's price unit)",
"price from", "price until", "datapoint group",
"down-bin profile's stacks", "up-bin profile's stacks",
"delta-bin profile's stacks"
])
for row in rows:
w.writerow(row)
print(f"[OK] {path}")
# ══════════════════════════════════════════════════════════════════════════════
# 6. CHART GENERATION
# ══════════════════════════════════════════════════════════════════════════════
# ── Color palette (white / light theme) ──────────────────────────────────────
CLR_BG = "#ffffff"
CLR_FG = "#1a1a1a"
CLR_GRID = "#d0d0d0"
CLR_ACCENT1 = "#1565c0" # deep blue (scatter)
CLR_ACCENT2 = "#e65100" # deep orange (CMP)
CLR_ACCENT3 = "#2e7d32" # deep green (GMP)
CLR_MUTED = "#607d8b"
CLR_LABEL = "#333333" # label text
CHART_DPI = 300
def _apply_style(ax, title=""):
"""Apply a consistent white/light theme to an axes object."""
ax.set_facecolor(CLR_BG)
ax.figure.set_facecolor(CLR_BG)
ax.tick_params(colors=CLR_FG, labelsize=8)
ax.xaxis.label.set_color(CLR_FG)
ax.yaxis.label.set_color(CLR_FG)
ax.title.set_color(CLR_FG)
for spine in ax.spines.values():
spine.set_color(CLR_GRID)
ax.grid(True, color=CLR_GRID, linewidth=0.5, alpha=0.4)
if title:
ax.set_title(title, fontsize=11, fontweight="bold", pad=10)
def chart_price_scatter(datapoints, path="fig_price_scatter.png", ax=None):
"""Scatter + line plot of price vs trade index, labeled A–J."""
labels = [d[0] for d in datapoints]
trades = [d[1] for d in datapoints]
prices = [d[2] for d in datapoints]
standalone = ax is None
if standalone:
fig, ax = plt.subplots(figsize=(7, 4))
_apply_style(ax, "Price vs. Trade Index (Datapoints A–J)")
ax.plot(trades, prices, color=CLR_ACCENT1, linewidth=1.2, alpha=0.45,
zorder=1)
ax.scatter(trades, prices, color=CLR_ACCENT1, s=52, zorder=2,
edgecolors="white", linewidths=0.6)
for lbl, x, y in zip(labels, trades, prices):
ax.annotate(lbl, (x, y), textcoords="offset points",
xytext=(0, 10), ha="center", fontsize=8,
fontweight="bold", color=CLR_LABEL)
ax.set_xlabel("Trade Index (raw trades)", fontsize=9)
ax.set_ylabel("Price (USD)", fontsize=9)
ax.yaxis.set_major_formatter(ticker.FormatStrFormatter("%.0f"))
if standalone:
fig.tight_layout()
fig.savefig(path, dpi=CHART_DPI, bbox_inches="tight")
plt.close(fig)
print(f"[OK] {path}")
def _draw_profile(ax, profile, beta, title, bar_color):
"""Draw a horizontal bar chart for a profile onto *ax*."""
b_min = min(profile.keys())
b_max = max(profile.keys())
bin_labels = []
stacks = []
groups = []
for b in range(b_min, b_max + 1):
p_from = b * beta
p_until = (b + 1) * beta
bin_labels.append(f"{int(p_from)}–{int(p_until)}")
info = profile.get(b, {"labels": [], "count": 0})
stacks.append(info["count"])
groups.append("".join(sorted(info["labels"])))
y_pos = range(len(bin_labels))
bars = ax.barh(y_pos, stacks, color=bar_color, edgecolor="white",
linewidth=0.5, height=0.7, alpha=0.85)
ax.set_yticks(y_pos)
ax.set_yticklabels(bin_labels, fontsize=7)
ax.set_xlabel("Stacks", fontsize=9)
ax.set_ylabel("Price Bin (USD)", fontsize=9)
# Annotate bars with datapoint group letters
max_s = max(stacks) if stacks else 1
for i, (bar, grp) in enumerate(zip(bars, groups)):
if grp:
ax.text(bar.get_width() + 0.12, bar.get_y() + bar.get_height() / 2,
grp, va="center", ha="left", fontsize=7, color=CLR_LABEL,
fontweight="bold")
ax.set_xlim(0, max_s + 2)
_apply_style(ax, title)
def chart_profile(profile, beta, path, title, bar_color):
"""Standalone horizontal bar chart for a single profile (CMP or GMP)."""
if not profile:
return
fig, ax = plt.subplots(figsize=(6, 5))
_draw_profile(ax, profile, beta, title, bar_color)
fig.tight_layout()
fig.savefig(path, dpi=CHART_DPI, bbox_inches="tight")
plt.close(fig)
print(f"[OK] {path}")
def chart_cmp_vs_gmp(cmp_profile, gmp_profile, beta,
path="fig_cmp_vs_gmp.png"):
"""Side-by-side comparison of CMP and GMP profiles (2-panel)."""
if not cmp_profile or not gmp_profile:
return
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(11, 5), sharey=True)
_draw_profile(ax1, cmp_profile, beta, "CMP Profile", CLR_ACCENT2)
_draw_profile(ax2, gmp_profile, beta, "GMP Profile", CLR_ACCENT3)
ax2.set_ylabel("") # avoid duplicate y-label
fig.suptitle("CMP vs. GMP β€” 10-Datapoint Example (Ξ² = 1)",
fontsize=13, fontweight="bold", color=CLR_FG, y=1.01)
fig.tight_layout()
fig.savefig(path, dpi=CHART_DPI, bbox_inches="tight")
plt.close(fig)
print(f"[OK] {path}")
def chart_combined_3panel(datapoints, cmp_profile, gmp_profile, beta,
path="fig_combined_3panel.png"):
"""Three-panel chart: Datapoints | CMP with letters | GMP with letters."""
if not cmp_profile or not gmp_profile:
return
fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(16, 5.5),
gridspec_kw={"width_ratios": [1.1, 1, 1]})
# ── Panel 1: Datapoints scatter with labels ──────────────────────────
labels = [d[0] for d in datapoints]
trades = [d[1] for d in datapoints]
prices = [d[2] for d in datapoints]
_apply_style(ax1, "Datapoints (A–J)")
ax1.plot(trades, prices, color=CLR_ACCENT1, linewidth=1.2, alpha=0.4,
zorder=1)
ax1.scatter(trades, prices, color=CLR_ACCENT1, s=52, zorder=2,
edgecolors="white", linewidths=0.6)
for lbl, x, y in zip(labels, trades, prices):
ax1.annotate(lbl, (x, y), textcoords="offset points",
xytext=(0, 10), ha="center", fontsize=9,
fontweight="bold", color=CLR_LABEL)
ax1.set_xlabel("Trade Index", fontsize=9)
ax1.set_ylabel("Price (USD)", fontsize=9)
ax1.yaxis.set_major_formatter(ticker.FormatStrFormatter("%.0f"))
# ── Panel 2: CMP with group letters ──────────────────────────────────
_draw_profile(ax2, cmp_profile, beta, "CMP with Letters", CLR_ACCENT2)
# ── Panel 3: GMP with group letters ──────────────────────────────────
_draw_profile(ax3, gmp_profile, beta, "GMP with Letters", CLR_ACCENT3)
ax3.set_ylabel("") # avoid duplicate y-label
fig.suptitle("Datapoints β†’ CMP β†’ GMP (Ξ² = 1)",
fontsize=14, fontweight="bold", color=CLR_FG, y=1.02)
fig.tight_layout()
fig.savefig(path, dpi=CHART_DPI, bbox_inches="tight")
plt.close(fig)
print(f"[OK] {path}")
def chart_updown_footprint(updown_profile, beta,
path="fig_updown_footprint.png"):
"""Dual horizontal bar chart: down-bins (left/red) vs up-bins (right/teal)."""
if not updown_profile:
return
CLR_UP = "#00897b" # teal
CLR_DOWN = "#e53935" # red
b_min = min(updown_profile.keys())
b_max = max(updown_profile.keys())
bin_labels = []
up_vals = []
down_vals = []
delta_vals = []
for b in range(b_min, b_max + 1):
p_from = b * beta
p_until = (b + 1) * beta
bin_labels.append(f"{int(p_from)}-{int(p_until)}")
info = updown_profile.get(b, {"labels": [], "up": 0, "down": 0})
up_vals.append(info["up"])
down_vals.append(info["down"])
delta_vals.append(info["up"] - info["down"])
y_pos = list(range(len(bin_labels)))
max_val = max(max(up_vals, default=1), max(down_vals, default=1), 1)
fig, ax = plt.subplots(figsize=(8, 5.5))
_apply_style(ax, "Up/Down-Bin Footprint Profile (GMP-based)")
# Down bars extend to the LEFT (negative x)
bars_down = ax.barh(y_pos, [-d for d in down_vals], color=CLR_DOWN,
edgecolor="white", linewidth=0.5, height=0.65,
alpha=0.85, label="Down-bin")
# Up bars extend to the RIGHT (positive x)
bars_up = ax.barh(y_pos, up_vals, color=CLR_UP,
edgecolor="white", linewidth=0.5, height=0.65,
alpha=0.85, label="Up-bin")
# Annotate bars with counts
for i, (dv, uv, deltav) in enumerate(zip(down_vals, up_vals, delta_vals)):
if dv > 0:
ax.text(-dv - 0.15, i, str(dv), va="center", ha="right",
fontsize=7, color=CLR_DOWN, fontweight="bold")
if uv > 0:
ax.text(uv + 0.15, i, str(uv), va="center", ha="left",
fontsize=7, color=CLR_UP, fontweight="bold")
# Delta annotation at far right
delta_color = CLR_UP if deltav > 0 else (CLR_DOWN if deltav < 0 else CLR_MUTED)
delta_str = f"{deltav:+d}" if deltav != 0 else "0"
ax.text(max_val + 1.0, i, f"\u0394={delta_str}", va="center", ha="left",
fontsize=6.5, color=delta_color)
ax.set_yticks(y_pos)
ax.set_yticklabels(bin_labels, fontsize=7)
ax.set_xlabel("Stacks", fontsize=9)
ax.set_ylabel("Price Bin (USD)", fontsize=9)
ax.axvline(0, color=CLR_FG, linewidth=0.6)
ax.set_xlim(-max_val - 1.5, max_val + 2.5)
ax.legend(loc="lower right", fontsize=8)
fig.tight_layout()
fig.savefig(path, dpi=CHART_DPI, bbox_inches="tight")
plt.close(fig)
print(f"[OK] {path}")
# ══════════════════════════════════════════════════════════════════════════════
# 7. MAIN
# ══════════════════════════════════════════════════════════════════════════════
def main():
out_dir = os.path.dirname(os.path.abspath(__file__))
# ── Build profiles ────────────────────────────────────────────────────
cmp = build_cmp(DATAPOINTS, BIN_SIZE)
gmp = build_gmp(DATAPOINTS, BIN_SIZE)
updown = build_updown_profile(DATAPOINTS, BIN_SIZE)
# ── Write CSVs ────────────────────────────────────────────────────────
write_datapoints_csv(DATAPOINTS, os.path.join(out_dir, "datapoints.csv"))
write_profile_csv(cmp, BIN_SIZE, os.path.join(out_dir, "cmp_profile.csv"))
write_profile_csv(gmp, BIN_SIZE, os.path.join(out_dir, "gmp_profile.csv"))
write_updown_profile_csv(updown, gmp, BIN_SIZE,
os.path.join(out_dir, "updown_profile.csv"))
# ── Generate charts ───────────────────────────────────────────────────
if HAS_MPL:
chart_price_scatter(
DATAPOINTS, os.path.join(out_dir, "fig_price_scatter.png"))
chart_profile(
cmp, BIN_SIZE, os.path.join(out_dir, "fig_cmp_profile.png"),
"Conventional Market Profile (CMP)", CLR_ACCENT2)
chart_profile(
gmp, BIN_SIZE, os.path.join(out_dir, "fig_gmp_profile.png"),
"Gap-Filled Market Profile (GMP)", CLR_ACCENT3)
chart_cmp_vs_gmp(
cmp, gmp, BIN_SIZE,
os.path.join(out_dir, "fig_cmp_vs_gmp.png"))
chart_combined_3panel(
DATAPOINTS, cmp, gmp, BIN_SIZE,
os.path.join(out_dir, "fig_combined_3panel.png"))
chart_updown_footprint(
updown, BIN_SIZE,
os.path.join(out_dir, "fig_updown_footprint.png"))
# ── Print summary ─────────────────────────────────────────────────────
print("\n── CMP Profile ──")
b_min = min(cmp.keys())
b_max = max(cmp.keys())
for b in range(b_min, b_max + 1):
info = cmp.get(b, {"labels": [], "count": 0})
grp = "".join(sorted(info["labels"]))
print(f" Bin {b - b_min + 1}: {int(b * BIN_SIZE)}–{int((b+1) * BIN_SIZE)} "
f"group={grp or 'β€”':6s} stacks={info['count']}")
print("\n── GMP Profile ──")
b_min = min(gmp.keys())
b_max = max(gmp.keys())
for b in range(b_min, b_max + 1):
info = gmp.get(b, {"labels": [], "count": 0})
grp = "".join(sorted(info["labels"]))
print(f" Bin {b - b_min + 1}: {int(b * BIN_SIZE)}–{int((b+1) * BIN_SIZE)} "
f"group={grp or 'β€”':6s} stacks={info['count']}")
print("\n── Up/Down-Bin Footprint Profile ──")
b_min = min(updown.keys())
b_max = max(updown.keys())
for b in range(b_min, b_max + 1):
info = updown.get(b, {"labels": [], "up": 0, "down": 0})
grp = "".join(info["labels"])
delta = info["up"] - info["down"]
print(f" Bin {b - b_min + 1}: {int(b * BIN_SIZE)}–{int((b+1) * BIN_SIZE)} "
f"group={grp or 'β€”':6s} up={info['up']} down={info['down']} "
f"delta={delta:+d}")
if __name__ == "__main__":
main()