#!/usr/bin/env python3 """ generate_profiles.py — mBA-GMP.v3 Dataframe & Chart Generator ============================================================== Produces CSV files and publication-quality PNG charts demonstrating the Conventional Market Profile (CMP), Gap-filled Market Profile (GMP), and Up/Down-Bin Footprint Profile using a 10-datapoint XAUUSD example. Outputs: CSV: datapoints.csv, cmp_profile.csv, gmp_profile.csv, updown_profile.csv PNG: fig_price_scatter.png, fig_cmp_profile.png, fig_gmp_profile.png, fig_cmp_vs_gmp.png, fig_updown_footprint.png """ import math import csv import os # ── Try to import optional plotting libs ───────────────────────────────────── try: import matplotlib matplotlib.use("Agg") # non-interactive backend import matplotlib.pyplot as plt import matplotlib.ticker as ticker HAS_MPL = True except ImportError: HAS_MPL = False print("[WARN] matplotlib not found – CSV files will still be generated " "but PNG charts will be skipped.") # ══════════════════════════════════════════════════════════════════════════════ # 1. RAW DATAPOINTS # ══════════════════════════════════════════════════════════════════════════════ DATAPOINTS = [ ("A", 1, 3000.914), ("B", 2, 3003.837), ("C", 3, 3002.432), ("D", 4, 3009.892), ("E", 5, 3007.698), ("F", 6, 3009.176), ("G", 7, 3003.381), ("H", 8, 3004.283), ("I", 9, 3003.512), ("J", 10, 3003.012), ] BIN_SIZE = 1 # β = 1 symbol price unit # ══════════════════════════════════════════════════════════════════════════════ # 2. HELPER FUNCTIONS # ══════════════════════════════════════════════════════════════════════════════ def bin_index(price: float, beta: float = BIN_SIZE) -> int: """Return the bin index for a given price: floor(price / β).""" return int(math.floor(price / beta)) def bin_range(price: float, beta: float = BIN_SIZE): """Return (price_from, price_until) for the bin containing *price*.""" b = bin_index(price, beta) return b * beta, (b + 1) * beta def make_bin_key(b: int, beta: float = BIN_SIZE): """Return (bin_number_1based, price_from, price_until) for bin index *b*.""" return (b * beta, (b + 1) * beta) # ══════════════════════════════════════════════════════════════════════════════ # 3. CMP CONSTRUCTION # ══════════════════════════════════════════════════════════════════════════════ def build_cmp(datapoints, beta=BIN_SIZE): """ Build CMP profile. Returns dict: bin_index -> {"labels": [str], "count": int} """ profile = {} for label, _trade, price in datapoints: b = bin_index(price, beta) if b not in profile: profile[b] = {"labels": [], "count": 0} profile[b]["labels"].append(label) profile[b]["count"] += 1 return profile # ══════════════════════════════════════════════════════════════════════════════ # 4. GMP CONSTRUCTION # ══════════════════════════════════════════════════════════════════════════════ def build_gmp(datapoints, beta=BIN_SIZE): """ Build GMP profile (gap-filled). Convention (matches the dataframe approach): 1. Every datapoint fills its OWN bin with its own label (same as CMP). 2. For each consecutive pair (i, i+1), the intermediate bins BETWEEN b(p_i) and b(p_{i+1}) — exclusive of both endpoints — are filled with the SOURCE datapoint's label (datapoint i). Returns dict: bin_index -> {"labels": [str], "count": int} """ profile = {} def add_to_bin(b, label): if b not in profile: profile[b] = {"labels": [], "count": 0} profile[b]["labels"].append(label) profile[b]["count"] += 1 # ── Step 1: CMP-style placement — each datapoint fills its own bin ── for label, _trade, price in datapoints: add_to_bin(bin_index(price, beta), label) # ── Step 2: Gap-fill intermediate bins between consecutive pairs ───── for idx in range(len(datapoints) - 1): src_label, _, src_price = datapoints[idx] _dst_label, _, dst_price = datapoints[idx + 1] b_from = bin_index(src_price, beta) b_to = bin_index(dst_price, beta) if abs(b_to - b_from) <= 1: # Adjacent or same bin — no intermediate bins to fill continue direction = 1 if b_to > b_from else -1 # Fill bins strictly BETWEEN b_from and b_to (exclusive of both) b = b_from + direction while b != b_to: add_to_bin(b, src_label) b += direction return profile # ══════════════════════════════════════════════════════════════════════════════ # 4b. UP/DOWN-BIN FOOTPRINT PROFILE CONSTRUCTION # ══════════════════════════════════════════════════════════════════════════════ def build_updown_profile(datapoints, beta=BIN_SIZE): """ Build the Up/Down-Bin Footprint Profile. For each consecutive pair of datapoints, every bin on the gap-filled path (excluding the source datapoint's own bin) is classified as an up-bin or down-bin depending on the direction of the move. The first datapoint (no prior movement) receives 0 up / 0 down. Returns dict: bin_index -> {"labels": [str], "up": int, "down": int} """ # ── Collect GMP group labels (reuse from GMP logic) ────────────────── groups = {} # bin_index -> list of labels def add_label(b, label): if b not in groups: groups[b] = [] groups[b].append(label) # CMP placement for label, _trade, price in datapoints: add_label(bin_index(price, beta), label) # Gap-fill intermediate labels for idx in range(len(datapoints) - 1): src_label, _, src_price = datapoints[idx] _, _, dst_price = datapoints[idx + 1] b_from = bin_index(src_price, beta) b_to = bin_index(dst_price, beta) if abs(b_to - b_from) <= 1: continue direction = 1 if b_to > b_from else -1 b = b_from + direction while b != b_to: add_label(b, src_label) b += direction # ── Now compute up/down counts per bin ──────────────────────────────── up_counts = {} # bin_index -> int down_counts = {} # bin_index -> int for idx in range(len(datapoints) - 1): _, _, src_price = datapoints[idx] _, _, dst_price = datapoints[idx + 1] b_from = bin_index(src_price, beta) b_to = bin_index(dst_price, beta) if b_from == b_to: # Same bin, but price might have moved if dst_price > src_price: up_counts[b_from] = up_counts.get(b_from, 0) + 1 elif dst_price < src_price: down_counts[b_from] = down_counts.get(b_from, 0) + 1 continue is_up = b_to > b_from direction = 1 if is_up else -1 # Every bin on the path AFTER the source bin (exclusive of source, # inclusive of destination) gets a directional count. b = b_from + direction while True: if is_up: up_counts[b] = up_counts.get(b, 0) + 1 else: down_counts[b] = down_counts.get(b, 0) + 1 if b == b_to: break b += direction # ── Merge into result dict ─────────────────────────────────────────── all_bins = set(groups.keys()) | set(up_counts.keys()) | set(down_counts.keys()) profile = {} for b in all_bins: profile[b] = { "labels": sorted(groups.get(b, [])), "up": up_counts.get(b, 0), "down": down_counts.get(b, 0), } return profile # ══════════════════════════════════════════════════════════════════════════════ # 5. CSV OUTPUT # ══════════════════════════════════════════════════════════════════════════════ def write_datapoints_csv(datapoints, path="datapoints.csv"): """Write the raw datapoints to CSV.""" with open(path, "w", newline="") as f: w = csv.writer(f) w.writerow(["datapoint", "x-axis trades (raw trades or time)", "y-axis Price"]) for label, trade, price in datapoints: w.writerow([label, trade, f"{price:.3f}"]) print(f"[OK] {path}") def write_profile_csv(profile, beta, path): """Write a profile (CMP or GMP) to CSV, bins numbered 1..N from lowest.""" if not profile: print(f"[WARN] Empty profile, skipping {path}") return b_min = min(profile.keys()) b_max = max(profile.keys()) # Include ALL bins from b_min to b_max (even empty ones) rows = [] bin_number = 1 for b in range(b_min, b_max + 1): p_from = b * beta p_until = (b + 1) * beta info = profile.get(b, {"labels": [], "count": 0}) group = "".join(sorted(info["labels"])) count = info["count"] rows.append([bin_number, int(p_from), int(p_until), group, count]) bin_number += 1 with open(path, "w", newline="") as f: w = csv.writer(f) w.writerow([ f"bin (with binsize = {beta} symbol's price unit)", "price from", "price until", "datapoint group", "number of profile's stacks" ]) for row in rows: w.writerow(row) print(f"[OK] {path}") def write_updown_profile_csv(updown_profile, gmp_groups, beta, path): """Write the Up/Down-Bin Footprint Profile to CSV.""" if not updown_profile: print(f"[WARN] Empty profile, skipping {path}") return b_min = min(updown_profile.keys()) b_max = max(updown_profile.keys()) rows = [] bin_number = 1 for b in range(b_min, b_max + 1): p_from = b * beta p_until = (b + 1) * beta info = updown_profile.get(b, {"labels": [], "up": 0, "down": 0}) group = "".join(info["labels"]) up_val = info["up"] down_val = info["down"] delta_val = up_val - down_val rows.append([bin_number, int(p_from), int(p_until), group, down_val, up_val, delta_val]) bin_number += 1 with open(path, "w", newline="") as f: w = csv.writer(f) w.writerow([ f"bin (with binsize = {beta} symbol's price unit)", "price from", "price until", "datapoint group", "down-bin profile's stacks", "up-bin profile's stacks", "delta-bin profile's stacks" ]) for row in rows: w.writerow(row) print(f"[OK] {path}") # ══════════════════════════════════════════════════════════════════════════════ # 6. CHART GENERATION # ══════════════════════════════════════════════════════════════════════════════ # ── Color palette (white / light theme) ────────────────────────────────────── CLR_BG = "#ffffff" CLR_FG = "#1a1a1a" CLR_GRID = "#d0d0d0" CLR_ACCENT1 = "#1565c0" # deep blue (scatter) CLR_ACCENT2 = "#e65100" # deep orange (CMP) CLR_ACCENT3 = "#2e7d32" # deep green (GMP) CLR_MUTED = "#607d8b" CLR_LABEL = "#333333" # label text CHART_DPI = 300 def _apply_style(ax, title=""): """Apply a consistent white/light theme to an axes object.""" ax.set_facecolor(CLR_BG) ax.figure.set_facecolor(CLR_BG) ax.tick_params(colors=CLR_FG, labelsize=8) ax.xaxis.label.set_color(CLR_FG) ax.yaxis.label.set_color(CLR_FG) ax.title.set_color(CLR_FG) for spine in ax.spines.values(): spine.set_color(CLR_GRID) ax.grid(True, color=CLR_GRID, linewidth=0.5, alpha=0.4) if title: ax.set_title(title, fontsize=11, fontweight="bold", pad=10) def chart_price_scatter(datapoints, path="fig_price_scatter.png", ax=None): """Scatter + line plot of price vs trade index, labeled A–J.""" labels = [d[0] for d in datapoints] trades = [d[1] for d in datapoints] prices = [d[2] for d in datapoints] standalone = ax is None if standalone: fig, ax = plt.subplots(figsize=(7, 4)) _apply_style(ax, "Price vs. Trade Index (Datapoints A–J)") ax.plot(trades, prices, color=CLR_ACCENT1, linewidth=1.2, alpha=0.45, zorder=1) ax.scatter(trades, prices, color=CLR_ACCENT1, s=52, zorder=2, edgecolors="white", linewidths=0.6) for lbl, x, y in zip(labels, trades, prices): ax.annotate(lbl, (x, y), textcoords="offset points", xytext=(0, 10), ha="center", fontsize=8, fontweight="bold", color=CLR_LABEL) ax.set_xlabel("Trade Index (raw trades)", fontsize=9) ax.set_ylabel("Price (USD)", fontsize=9) ax.yaxis.set_major_formatter(ticker.FormatStrFormatter("%.0f")) if standalone: fig.tight_layout() fig.savefig(path, dpi=CHART_DPI, bbox_inches="tight") plt.close(fig) print(f"[OK] {path}") def _draw_profile(ax, profile, beta, title, bar_color): """Draw a horizontal bar chart for a profile onto *ax*.""" b_min = min(profile.keys()) b_max = max(profile.keys()) bin_labels = [] stacks = [] groups = [] for b in range(b_min, b_max + 1): p_from = b * beta p_until = (b + 1) * beta bin_labels.append(f"{int(p_from)}–{int(p_until)}") info = profile.get(b, {"labels": [], "count": 0}) stacks.append(info["count"]) groups.append("".join(sorted(info["labels"]))) y_pos = range(len(bin_labels)) bars = ax.barh(y_pos, stacks, color=bar_color, edgecolor="white", linewidth=0.5, height=0.7, alpha=0.85) ax.set_yticks(y_pos) ax.set_yticklabels(bin_labels, fontsize=7) ax.set_xlabel("Stacks", fontsize=9) ax.set_ylabel("Price Bin (USD)", fontsize=9) # Annotate bars with datapoint group letters max_s = max(stacks) if stacks else 1 for i, (bar, grp) in enumerate(zip(bars, groups)): if grp: ax.text(bar.get_width() + 0.12, bar.get_y() + bar.get_height() / 2, grp, va="center", ha="left", fontsize=7, color=CLR_LABEL, fontweight="bold") ax.set_xlim(0, max_s + 2) _apply_style(ax, title) def chart_profile(profile, beta, path, title, bar_color): """Standalone horizontal bar chart for a single profile (CMP or GMP).""" if not profile: return fig, ax = plt.subplots(figsize=(6, 5)) _draw_profile(ax, profile, beta, title, bar_color) fig.tight_layout() fig.savefig(path, dpi=CHART_DPI, bbox_inches="tight") plt.close(fig) print(f"[OK] {path}") def chart_cmp_vs_gmp(cmp_profile, gmp_profile, beta, path="fig_cmp_vs_gmp.png"): """Side-by-side comparison of CMP and GMP profiles (2-panel).""" if not cmp_profile or not gmp_profile: return fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(11, 5), sharey=True) _draw_profile(ax1, cmp_profile, beta, "CMP Profile", CLR_ACCENT2) _draw_profile(ax2, gmp_profile, beta, "GMP Profile", CLR_ACCENT3) ax2.set_ylabel("") # avoid duplicate y-label fig.suptitle("CMP vs. GMP — 10-Datapoint Example (β = 1)", fontsize=13, fontweight="bold", color=CLR_FG, y=1.01) fig.tight_layout() fig.savefig(path, dpi=CHART_DPI, bbox_inches="tight") plt.close(fig) print(f"[OK] {path}") def chart_combined_3panel(datapoints, cmp_profile, gmp_profile, beta, path="fig_combined_3panel.png"): """Three-panel chart: Datapoints | CMP with letters | GMP with letters.""" if not cmp_profile or not gmp_profile: return fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(16, 5.5), gridspec_kw={"width_ratios": [1.1, 1, 1]}) # ── Panel 1: Datapoints scatter with labels ────────────────────────── labels = [d[0] for d in datapoints] trades = [d[1] for d in datapoints] prices = [d[2] for d in datapoints] _apply_style(ax1, "Datapoints (A–J)") ax1.plot(trades, prices, color=CLR_ACCENT1, linewidth=1.2, alpha=0.4, zorder=1) ax1.scatter(trades, prices, color=CLR_ACCENT1, s=52, zorder=2, edgecolors="white", linewidths=0.6) for lbl, x, y in zip(labels, trades, prices): ax1.annotate(lbl, (x, y), textcoords="offset points", xytext=(0, 10), ha="center", fontsize=9, fontweight="bold", color=CLR_LABEL) ax1.set_xlabel("Trade Index", fontsize=9) ax1.set_ylabel("Price (USD)", fontsize=9) ax1.yaxis.set_major_formatter(ticker.FormatStrFormatter("%.0f")) # ── Panel 2: CMP with group letters ────────────────────────────────── _draw_profile(ax2, cmp_profile, beta, "CMP with Letters", CLR_ACCENT2) # ── Panel 3: GMP with group letters ────────────────────────────────── _draw_profile(ax3, gmp_profile, beta, "GMP with Letters", CLR_ACCENT3) ax3.set_ylabel("") # avoid duplicate y-label fig.suptitle("Datapoints → CMP → GMP (β = 1)", fontsize=14, fontweight="bold", color=CLR_FG, y=1.02) fig.tight_layout() fig.savefig(path, dpi=CHART_DPI, bbox_inches="tight") plt.close(fig) print(f"[OK] {path}") def chart_updown_footprint(updown_profile, beta, path="fig_updown_footprint.png"): """Dual horizontal bar chart: down-bins (left/red) vs up-bins (right/teal).""" if not updown_profile: return CLR_UP = "#00897b" # teal CLR_DOWN = "#e53935" # red b_min = min(updown_profile.keys()) b_max = max(updown_profile.keys()) bin_labels = [] up_vals = [] down_vals = [] delta_vals = [] for b in range(b_min, b_max + 1): p_from = b * beta p_until = (b + 1) * beta bin_labels.append(f"{int(p_from)}-{int(p_until)}") info = updown_profile.get(b, {"labels": [], "up": 0, "down": 0}) up_vals.append(info["up"]) down_vals.append(info["down"]) delta_vals.append(info["up"] - info["down"]) y_pos = list(range(len(bin_labels))) max_val = max(max(up_vals, default=1), max(down_vals, default=1), 1) fig, ax = plt.subplots(figsize=(8, 5.5)) _apply_style(ax, "Up/Down-Bin Footprint Profile (GMP-based)") # Down bars extend to the LEFT (negative x) bars_down = ax.barh(y_pos, [-d for d in down_vals], color=CLR_DOWN, edgecolor="white", linewidth=0.5, height=0.65, alpha=0.85, label="Down-bin") # Up bars extend to the RIGHT (positive x) bars_up = ax.barh(y_pos, up_vals, color=CLR_UP, edgecolor="white", linewidth=0.5, height=0.65, alpha=0.85, label="Up-bin") # Annotate bars with counts for i, (dv, uv, deltav) in enumerate(zip(down_vals, up_vals, delta_vals)): if dv > 0: ax.text(-dv - 0.15, i, str(dv), va="center", ha="right", fontsize=7, color=CLR_DOWN, fontweight="bold") if uv > 0: ax.text(uv + 0.15, i, str(uv), va="center", ha="left", fontsize=7, color=CLR_UP, fontweight="bold") # Delta annotation at far right delta_color = CLR_UP if deltav > 0 else (CLR_DOWN if deltav < 0 else CLR_MUTED) delta_str = f"{deltav:+d}" if deltav != 0 else "0" ax.text(max_val + 1.0, i, f"\u0394={delta_str}", va="center", ha="left", fontsize=6.5, color=delta_color) ax.set_yticks(y_pos) ax.set_yticklabels(bin_labels, fontsize=7) ax.set_xlabel("Stacks", fontsize=9) ax.set_ylabel("Price Bin (USD)", fontsize=9) ax.axvline(0, color=CLR_FG, linewidth=0.6) ax.set_xlim(-max_val - 1.5, max_val + 2.5) ax.legend(loc="lower right", fontsize=8) fig.tight_layout() fig.savefig(path, dpi=CHART_DPI, bbox_inches="tight") plt.close(fig) print(f"[OK] {path}") # ══════════════════════════════════════════════════════════════════════════════ # 7. MAIN # ══════════════════════════════════════════════════════════════════════════════ def main(): out_dir = os.path.dirname(os.path.abspath(__file__)) # ── Build profiles ──────────────────────────────────────────────────── cmp = build_cmp(DATAPOINTS, BIN_SIZE) gmp = build_gmp(DATAPOINTS, BIN_SIZE) updown = build_updown_profile(DATAPOINTS, BIN_SIZE) # ── Write CSVs ──────────────────────────────────────────────────────── write_datapoints_csv(DATAPOINTS, os.path.join(out_dir, "datapoints.csv")) write_profile_csv(cmp, BIN_SIZE, os.path.join(out_dir, "cmp_profile.csv")) write_profile_csv(gmp, BIN_SIZE, os.path.join(out_dir, "gmp_profile.csv")) write_updown_profile_csv(updown, gmp, BIN_SIZE, os.path.join(out_dir, "updown_profile.csv")) # ── Generate charts ─────────────────────────────────────────────────── if HAS_MPL: chart_price_scatter( DATAPOINTS, os.path.join(out_dir, "fig_price_scatter.png")) chart_profile( cmp, BIN_SIZE, os.path.join(out_dir, "fig_cmp_profile.png"), "Conventional Market Profile (CMP)", CLR_ACCENT2) chart_profile( gmp, BIN_SIZE, os.path.join(out_dir, "fig_gmp_profile.png"), "Gap-Filled Market Profile (GMP)", CLR_ACCENT3) chart_cmp_vs_gmp( cmp, gmp, BIN_SIZE, os.path.join(out_dir, "fig_cmp_vs_gmp.png")) chart_combined_3panel( DATAPOINTS, cmp, gmp, BIN_SIZE, os.path.join(out_dir, "fig_combined_3panel.png")) chart_updown_footprint( updown, BIN_SIZE, os.path.join(out_dir, "fig_updown_footprint.png")) # ── Print summary ───────────────────────────────────────────────────── print("\n── CMP Profile ──") b_min = min(cmp.keys()) b_max = max(cmp.keys()) for b in range(b_min, b_max + 1): info = cmp.get(b, {"labels": [], "count": 0}) grp = "".join(sorted(info["labels"])) print(f" Bin {b - b_min + 1}: {int(b * BIN_SIZE)}–{int((b+1) * BIN_SIZE)} " f"group={grp or '—':6s} stacks={info['count']}") print("\n── GMP Profile ──") b_min = min(gmp.keys()) b_max = max(gmp.keys()) for b in range(b_min, b_max + 1): info = gmp.get(b, {"labels": [], "count": 0}) grp = "".join(sorted(info["labels"])) print(f" Bin {b - b_min + 1}: {int(b * BIN_SIZE)}–{int((b+1) * BIN_SIZE)} " f"group={grp or '—':6s} stacks={info['count']}") print("\n── Up/Down-Bin Footprint Profile ──") b_min = min(updown.keys()) b_max = max(updown.keys()) for b in range(b_min, b_max + 1): info = updown.get(b, {"labels": [], "up": 0, "down": 0}) grp = "".join(info["labels"]) delta = info["up"] - info["down"] print(f" Bin {b - b_min + 1}: {int(b * BIN_SIZE)}–{int((b+1) * BIN_SIZE)} " f"group={grp or '—':6s} up={info['up']} down={info['down']} " f"delta={delta:+d}") if __name__ == "__main__": main()