Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| from pathlib import Path | |
| import re | |
| import json | |
| from collections import Counter | |
| from datetime import datetime | |
| import random | |
| import os | |
| import time | |
| # ------------------------- | |
| # CSV freshness guardrails (UI-only, no layout/color changes) | |
| # ------------------------- | |
| def _human_age(seconds: float) -> str: | |
| try: | |
| s = int(max(0, seconds)) | |
| except Exception: | |
| return "unknown" | |
| if s < 60: | |
| return f"{s}s" | |
| m, s = divmod(s, 60) | |
| if m < 60: | |
| return f"{m}m {s}s" | |
| h, m = divmod(m, 60) | |
| if h < 24: | |
| return f"{h}h {m}m" | |
| d, h = divmod(h, 24) | |
| return f"{d}d {h}h" | |
| def _csv_signature(path: str): | |
| """Fast CSV signature: (exists, mtime_ns, size).""" | |
| try: | |
| stat = os.stat(path) | |
| return (True, stat.st_mtime_ns, stat.st_size) | |
| except FileNotFoundError: | |
| return (False, None, None) | |
| except Exception: | |
| return (False, None, None) | |
| def _csv_age_seconds(path: str): | |
| try: | |
| stat = os.stat(path) | |
| return max(0.0, time.time() - float(stat.st_mtime)) | |
| except Exception: | |
| return None | |
| def csv_changed_warning(csv_paths, label="CSV"): | |
| """ | |
| Show a warning if any CSV changed since last run (session_state). | |
| Returns (changed_any, changed_list). | |
| """ | |
| if "csv_sigs" not in st.session_state: | |
| st.session_state["csv_sigs"] = {} | |
| changed_any = False | |
| changed_list = [] | |
| for p in csv_paths: | |
| sig = _csv_signature(p) | |
| old = st.session_state["csv_sigs"].get(p) | |
| if old is None: | |
| # First time this session — just store it | |
| st.session_state["csv_sigs"][p] = sig | |
| continue | |
| if sig != old: | |
| changed_any = True | |
| changed_list.append(p) | |
| st.session_state["csv_sigs"][p] = sig | |
| if changed_any: | |
| st.warning( | |
| f"⚠️ {label} updated since last run. " | |
| f"Soft restart recommended for clean recency windows.\n\n" | |
| + "\n".join(f"• {os.path.basename(x)}" for x in changed_list) | |
| ) | |
| return changed_any, changed_list | |
| def soft_restart_app(): | |
| """Best-effort 'restart' inside Streamlit: clears caches + session state then reruns.""" | |
| try: | |
| st.cache_data.clear() | |
| except Exception: | |
| pass | |
| try: | |
| # Clear all session state keys (widgets will reinitialize) | |
| for k in list(st.session_state.keys()): | |
| del st.session_state[k] | |
| except Exception: | |
| pass | |
| try: | |
| st.rerun() | |
| except Exception: | |
| pass | |
| # --- Gimme5 Default Mode (UI-only, safe) --- | |
| g5_default_mode = False | |
| def fmt_ticket(nums, star=None, cfg=None): | |
| """Format a ticket for display. | |
| Accepts: | |
| - list/tuple of numbers | |
| - dict with keys like {'numbers':[...], 'star':...} (or bonus/power) | |
| - pre-formatted string | |
| Clamps bonus range using cfg.star_min/star_max when cfg is provided. | |
| """ | |
| if nums is None: | |
| return "" | |
| # If dict ticket, prefer its own star/bonus/power unless an explicit star was provided | |
| if isinstance(nums, dict): | |
| if star is None: | |
| for k in ("star", "bonus", "power", "pb", "mb", "sb", "lucky"): | |
| if k in nums and isinstance(nums.get(k), (int, str)): | |
| star = nums.get(k) | |
| break | |
| nums = nums.get("numbers") or nums.get("nums") or nums.get("main") or [] | |
| # If already a string (rare but supported), just attach star if valid | |
| if isinstance(nums, str): | |
| s = nums.strip() | |
| if not s: | |
| return "" | |
| if star is not None: | |
| try: | |
| st = int(star) | |
| if cfg is not None and getattr(cfg, "star_min", None) is not None and getattr(cfg, "star_max", None) is not None: | |
| if not (int(cfg.star_min) <= st <= int(cfg.star_max)): | |
| st = None | |
| if st is not None: | |
| return f"{s} (⭐ {st})" | |
| except Exception: | |
| pass | |
| return s | |
| # Normalize numbers list | |
| clean = [] | |
| for x in (nums or []): | |
| try: | |
| clean.append(int(x)) | |
| except Exception: | |
| continue | |
| # Clamp main range if cfg provided (safe-guard) | |
| if cfg is not None and getattr(cfg, "main_min", None) is not None and getattr(cfg, "main_max", None) is not None: | |
| try: | |
| mn, mx = int(cfg.main_min), int(cfg.main_max) | |
| clean = [n for n in clean if mn <= n <= mx] | |
| except Exception: | |
| pass | |
| s = "-".join(str(n) for n in clean) | |
| # Attach bonus ball if present (and in-range for this game) | |
| if star is not None and s: | |
| try: | |
| st = int(star) | |
| if cfg is not None and getattr(cfg, "star_min", None) is not None and getattr(cfg, "star_max", None) is not None: | |
| if not (int(cfg.star_min) <= st <= int(cfg.star_max)): | |
| st = None | |
| if st is not None: | |
| return f"{s} (⭐ {st})" | |
| except Exception: | |
| pass | |
| return s | |
| def _is_bad_run(nums, run_len=4): | |
| s = sorted(set(int(x) for x in nums)) | |
| streak = 1 | |
| for i in range(1, len(s)): | |
| if s[i] == s[i-1] + 1: | |
| streak += 1 | |
| if streak >= run_len: | |
| return True | |
| else: | |
| streak = 1 | |
| return False | |
| def build_anchor_spread_ticket(god_sets, *, main_min:int, main_max:int, anchor_min:int, anchor_max:int, seed:int=0): | |
| from collections import Counter | |
| rnd = random.Random(int(seed) if seed is not None else 0) | |
| pool = [] | |
| for s in (god_sets or []): | |
| for n in (s.get("numbers") or []): | |
| try: | |
| n = int(n) | |
| if main_min <= n <= main_max: | |
| pool.append(n) | |
| except Exception: | |
| pass | |
| freq = Counter(pool) | |
| core = set([n for n, _ in freq.most_common(7)]) | |
| a0 = max(main_min, anchor_min) | |
| a1 = min(main_max, anchor_max) | |
| if a0 > a1: | |
| return None | |
| anchor_band = list(range(a0, a1 + 1)) | |
| anchor_choices = sorted(anchor_band, key=lambda n: (freq.get(n, 0), n)) | |
| anchor = next((n for n in anchor_choices if n not in core), anchor_choices[0]) | |
| rest = sorted(set(pool), key=lambda n: (freq.get(n, 0), n)) | |
| rest = [n for n in rest if n not in core and n != anchor] | |
| if len(rest) < 10: | |
| rest = [n for n in range(main_min, main_max + 1) if n not in core and n != anchor] | |
| mid = [n for n in rest if 10 <= n <= 19] | |
| hi = [n for n in rest if n >= 20] | |
| any10 = [n for n in rest if n >= 10] | |
| picked = [anchor] | |
| def pick_from(band, k): | |
| band2 = [n for n in band if n not in picked] | |
| rnd.shuffle(band2) | |
| for n in band2: | |
| picked.append(n) | |
| if len(picked) >= 1 + k: | |
| break | |
| pick_from(mid, 2) | |
| remaining = 5 - len(picked) | |
| if remaining > 0: | |
| pick_from(hi, remaining) | |
| if len(picked) < 5: | |
| pick_from(any10, 5 - len(picked)) | |
| if len(picked) < 5: | |
| fb = [n for n in range(main_min, main_max + 1) if n not in picked] | |
| rnd.shuffle(fb) | |
| for n in fb: | |
| picked.append(n) | |
| if len(picked) >= 5: | |
| break | |
| picked = sorted(picked[:5]) | |
| if _is_bad_run(picked, run_len=4): | |
| fb = [n for n in range(main_min, main_max + 1) if n not in picked and n not in core] | |
| fb = sorted(fb, reverse=True) | |
| for swap_in in fb[:40]: | |
| for idx in range(1, 5): | |
| trial = sorted(picked[:idx] + [swap_in] + picked[idx+1:]) | |
| if not _is_bad_run(trial, run_len=4): | |
| return trial | |
| return picked | |
| def build_low_anchor(god_sets, *, main_min:int, main_max:int, seed:int=0): | |
| return build_anchor_spread_ticket(god_sets, main_min=main_min, main_max=main_max, | |
| anchor_min=main_min, anchor_max=min(9, main_max), seed=seed) | |
| def build_mid_anchor(god_sets, *, main_min:int, main_max:int, seed:int=0): | |
| return build_anchor_spread_ticket(god_sets, main_min=main_min, main_max=main_max, | |
| anchor_min=max(10, main_min), anchor_max=min(19, main_max), seed=seed) | |
| # Import V3.0 backend | |
| from lotto_predictor import ( | |
| predict_for_game_v3, | |
| GAME_CONFIGS, | |
| NumpyEncoder, | |
| clean_powerball_df, | |
| load_csv_for_game | |
| ) | |
| # Data paths (adjust if your files live in a data/ subfolder) | |
| DATA_PATHS = { | |
| "G5 (Gimme 5)": "gimme5_results.csv", | |
| "LA (Lotto America)": "la_results.csv", | |
| "L4L (Lucky for Life)": "Lucky For Life.csv", # ✅ NEW | |
| "MB (Megabucks)": "mb_results.csv", | |
| "MM (Mega Millions)": "mm_results.csv", | |
| "PB (Powerball)": "pb_results.csv", | |
| "wheel_template": "wheel.txt", | |
| } | |
| st.set_page_config(page_title="Multi Lotto AI Engine V6.0", layout="centered") | |
| st.title("🎯 Lotto AI Engine (V6.0)") | |
| # ------------------------- | |
| # Helper functions for V3.0 | |
| # ------------------------- | |
| def get_hot_and_cold_numbers(df: pd.DataFrame, cfg, top_n: int = 10): | |
| """Calculate hot and cold numbers from the dataframe""" | |
| # Count frequency of each number across all main columns | |
| all_numbers = [] | |
| for col in cfg.main_cols: | |
| all_numbers.extend(df[col].astype(int).tolist()) | |
| freq_counter = Counter(all_numbers) | |
| # Get all possible numbers for this game | |
| all_possible = list(range(cfg.main_min, cfg.main_max + 1)) | |
| # Create frequency list with zeros for missing numbers | |
| freq_list = [(num, freq_counter.get(num, 0)) for num in all_possible] | |
| # Sort by frequency | |
| sorted_by_freq = sorted(freq_list, key=lambda x: x[1], reverse=True) | |
| # Hot numbers (most frequent) | |
| hot = sorted_by_freq[:top_n] | |
| # Cold numbers (least frequent) | |
| cold = sorted_by_freq[-top_n:] | |
| cold.reverse() # Show coldest first | |
| return hot, cold | |
| def load_wheel_raw_text(path: str) -> str: | |
| """ | |
| Read wheel template as raw text using latin-1 fallback (robust to special bytes). | |
| Returns empty string if missing or unreadable. | |
| """ | |
| p = Path(path) | |
| if not p.exists(): | |
| return "" | |
| try: | |
| # latin-1 will never fail for single-byte encodings; errors='replace' for safety | |
| text = p.read_text(encoding="latin-1", errors="replace") | |
| return text | |
| except Exception: | |
| return "" | |
| def select_20_wheel_numbers(hot: list, cold: list): | |
| """Select 20 numbers for wheeling using hot/cold analysis""" | |
| wheel_map = {} | |
| wheel_labels = list("ABCDEFGHIJKLMNOPQRST") | |
| # Take top 10 hot numbers | |
| hot_numbers = [num for num, freq in hot[:10]] | |
| # Take bottom 10 cold numbers | |
| cold_numbers = [num for num, freq in cold[:10]] | |
| # Combine them | |
| selected_numbers = hot_numbers + cold_numbers | |
| # Map to letters A-T | |
| for i, letter in enumerate(wheel_labels): | |
| if i < len(selected_numbers): | |
| wheel_map[letter] = selected_numbers[i] | |
| return wheel_map | |
| def convert_numeric_wheel_to_letter_template(raw_text: str, wheel_size: int = 20) -> str: | |
| """ | |
| Convert numeric wheel lines like: | |
| 1-01-02-06-18-19-46 | |
| into letter-template lines: | |
| A B C D E | |
| """ | |
| if not raw_text: | |
| return "" | |
| lines = raw_text.splitlines() | |
| out_lines = [] | |
| for line in lines: | |
| # Detect lines that start with an index + dash (e.g. " 1-01-02-06-18-19-46") | |
| if re.match(r'^\s*\d+\s*-', line): | |
| # extract all integer tokens (1 or 2 digits) | |
| nums = re.findall(r'\d{1,2}', line) | |
| if not nums: | |
| continue | |
| # Many files start the line with the ticket index; drop it if present and equals first num | |
| first_num_match = re.match(r'^\s*(\d+)', line) | |
| if first_num_match and nums and nums[0] == first_num_match.group(1): | |
| nums = nums[1:] | |
| if len(nums) < 5: | |
| # skip if fewer than 5 picks found | |
| continue | |
| picks = nums[:5] # take the first five numbers | |
| letters = [] | |
| for n_str in picks: | |
| n = int(n_str) | |
| # Map 1->A, 2->B, ... wrap/clamp if needed | |
| idx = (n - 1) % 26 | |
| letters.append(chr(ord('A') + idx)) | |
| if len(letters) >= 5: | |
| out_lines.append(" ".join(letters)) | |
| return "\n".join(out_lines) | |
| def expand_wheel_with_template(wheel_map: dict, template: str): | |
| """Expand wheel template into actual number combinations""" | |
| combos = [] | |
| lines = template.strip().split('\n') | |
| for line in lines: | |
| letters = line.strip().split() | |
| if len(letters) >= 5: | |
| combo = [] | |
| for letter in letters[:5]: # Take first 5 letters | |
| if letter in wheel_map: | |
| combo.append(wheel_map[letter]) | |
| if len(combo) == 5: | |
| combos.append(sorted(combo)) | |
| return combos | |
| # ------------------------- | |
| # Display helpers | |
| # ------------------------- | |
| def display_hot_cold_tables(hot_df: pd.DataFrame, cold_df: pd.DataFrame): | |
| hot_df.index = range(1, len(hot_df) + 1) | |
| hot_df.index.name = "No" | |
| cold_df.index = range(1, len(cold_df) + 1) | |
| cold_df.index.name = "No" | |
| with st.expander("🔥 Hot Numbers (Top 10)"): | |
| st.table(hot_df) | |
| with st.expander("❄️ Cold Numbers (Bottom 10)"): | |
| st.table(cold_df) | |
| def display_wheel_table_from_hotcold(hot_df: pd.DataFrame, cold_df: pd.DataFrame): | |
| """ | |
| Build the 20-number wheel mapping and show the table in the UI. | |
| Returns wheel_map (dict letter->number). | |
| """ | |
| hot = [(int(n), f) for n, f in hot_df.values] | |
| cold = [(int(n), f) for n, f in cold_df.values] | |
| wheel_map = select_20_wheel_numbers(hot, cold) | |
| wheel_labels = list("ABCDEFGHIJKLMNOPQRST") | |
| ordered_numbers = [wheel_map.get(l, None) for l in wheel_labels] | |
| wheel_df = pd.DataFrame([ordered_numbers], columns=wheel_labels) | |
| with st.expander("🎡 Your 20 Numbers to Wheel"): | |
| st.table(wheel_df) | |
| return wheel_map | |
| def display_wheel_combinations_from_raw(wheel_map: dict, raw_template_text: str): | |
| """ | |
| Convert numeric wheel template to letter-template, then expand and display combos. | |
| """ | |
| if not raw_template_text: | |
| st.warning("Wheel template file is empty or not found.") | |
| return | |
| letter_template = convert_numeric_wheel_to_letter_template(raw_template_text) | |
| if not letter_template: | |
| st.warning("Wheel template parsing found no valid ticket lines.") | |
| return | |
| combos = expand_wheel_with_template(wheel_map, letter_template) | |
| if not combos: | |
| st.warning("No combinations produced after expansion.") | |
| return | |
| df = pd.DataFrame(combos, columns=["Num1", "Num2", "Num3", "Num4", "Num5"]) | |
| df.index = [f"Ticket{i+1}" for i in range(len(df))] | |
| df.index.name = "No" | |
| with st.expander(f"🎟️ Wheel Combinations ({len(df)} tickets)"): | |
| st.dataframe(df) | |
| # ------------------------- | |
| # ------------------------- | |
| # Results parsing + hit tracking (UI-only, no layout/color changes) | |
| # ------------------------- | |
| def _parse_results_text(txt: str): | |
| """Parse results like '1-2-3-4-5 (6)' or '1 2 3 4 5 ⭐6'. Returns (main_set, star_int_or_None).""" | |
| if not txt: | |
| return set(), None | |
| s = str(txt).strip() | |
| if not s: | |
| return set(), None | |
| # Extract numbers; first 5 are main, last (optional) treated as star/bonus if present | |
| nums = re.findall(r"\d+", s) | |
| if len(nums) < 5: | |
| return set(), None | |
| main = [int(x) for x in nums[:5]] | |
| star = None | |
| if len(nums) >= 6: | |
| try: | |
| star = int(nums[5]) | |
| except Exception: | |
| star = None | |
| return set(main), star | |
| def _ticket_main_set(tk): | |
| """Return main numbers set from a ticket dict/list/string.""" | |
| if tk is None: | |
| return set() | |
| if isinstance(tk, dict): | |
| cand = tk.get('numbers') or tk.get('nums') or tk.get('main') or [] | |
| return set(int(x) for x in cand if str(x).isdigit() or isinstance(x, int)) | |
| if isinstance(tk, (list, tuple, set)): | |
| return set(int(x) for x in tk if str(x).isdigit() or isinstance(x, int)) | |
| if isinstance(tk, str): | |
| nums = re.findall(r"\d+", tk) | |
| return set(int(x) for x in nums[:5]) if len(nums) >= 5 else set() | |
| return set() | |
| def _ticket_star(tk): | |
| if isinstance(tk, dict): | |
| for k in ('star','bonus','power','pb','mb','sb','lucky'): | |
| if k in tk and tk.get(k) is not None: | |
| try: | |
| return int(tk.get(k)) | |
| except Exception: | |
| return None | |
| return None | |
| def _calc_hits(ticket, results_main:set, results_star, cfg=None): | |
| main = _ticket_main_set(ticket) | |
| main_hits = len(main & (results_main or set())) | |
| star_hit = None | |
| if results_star is not None: | |
| stv = _ticket_star(ticket) | |
| if stv is None and isinstance(ticket, dict): | |
| # fmt_ticket supports pulling star from dict; mirror that behavior loosely | |
| stv = ticket.get('star') if isinstance(ticket.get('star'), (int,str)) else None | |
| try: | |
| stv = int(stv) if stv is not None else None | |
| except Exception: | |
| stv = None | |
| if stv is not None: | |
| # Clamp against cfg star range (safety) | |
| if cfg is not None and getattr(cfg,'star_min',None) is not None and getattr(cfg,'star_max',None) is not None: | |
| try: | |
| smin, smax = int(cfg.star_min), int(cfg.star_max) | |
| if not (smin <= int(stv) <= smax): | |
| stv = None | |
| except Exception: | |
| pass | |
| star_hit = (stv == results_star) if stv is not None else None | |
| return main_hits, star_hit | |
| def _get_named_ticket(god_sets_list, wanted): | |
| w = (wanted or '').strip().lower() | |
| for _s in (god_sets_list or []): | |
| nm = (_s.get('style') or _s.get('name') or '').strip().lower() | |
| if nm == w: | |
| return _s | |
| return None | |
| def _build_recommended_plays(*, game_key:str, primary_nums, primary_star, god_sets_list, collapse_t, neighbor_t): | |
| """Default 5-ticket recipe per game (UI-only): balanced + tight + wide + collapse + neighbor.""" | |
| picks = [] | |
| # Prefer the engine-provided 'balanced' style; fall back to PRIMARY | |
| balanced = _get_named_ticket(god_sets_list, 'balanced') | |
| if balanced: | |
| picks.append(('balanced', balanced)) | |
| else: | |
| picks.append(('primary', {'numbers': primary_nums or [], 'star': primary_star})) | |
| tight = _get_named_ticket(god_sets_list, 'tight_cluster') | |
| if tight: | |
| picks.append(('tight_cluster', tight)) | |
| wide = _get_named_ticket(god_sets_list, 'wide_spread') | |
| if wide: | |
| picks.append(('wide_spread', wide)) | |
| if collapse_t: | |
| picks.append(('collapse', collapse_t)) | |
| if neighbor_t: | |
| picks.append(('neighbor', neighbor_t)) | |
| # Fill to 5 from remaining god sets (keeps variety) | |
| if len(picks) < 5: | |
| used = set() | |
| for lbl, tk in picks: | |
| used.add(tuple(sorted(_ticket_main_set(tk)))) | |
| for s in (god_sets_list or []): | |
| key = tuple(sorted(_ticket_main_set(s))) | |
| if key and key not in used: | |
| picks.append((s.get('style') or s.get('name') or 'set', s)) | |
| used.add(key) | |
| if len(picks) >= 5: | |
| break | |
| # Final de-dupe + cap | |
| seen = set() | |
| out = [] | |
| for lbl, tk in picks: | |
| key = tuple(sorted(_ticket_main_set(tk))) | |
| if not key or key in seen: | |
| continue | |
| seen.add(key) | |
| out.append((lbl, tk)) | |
| if len(out) >= 5: | |
| break | |
| return out | |
| def _scorecard_key(game_key: str): | |
| return f"scorecard_{game_key}" | |
| # UI & main logic | |
| # ------------------------- | |
| lotto_options = [ | |
| "G5 (Gimme 5)", | |
| "LA (Lotto America)", | |
| "L4L (Lucky for Life)", # ✅ NEW | |
| "MB (Megabucks)", | |
| "MM (Mega Millions)", | |
| "PB (Powerball)", | |
| ] | |
| lotto_type = st.selectbox("Select Lotto Type:", options=lotto_options, index=0) | |
| # Map display names -> keys used in GAME_CONFIGS and predict_for_game_v3 | |
| GAME_KEY_MAP = { | |
| "G5 (Gimme 5)": "gimme5", | |
| "LA (Lotto America)": "la", | |
| "L4L (Lucky for Life)": "l4l", # ✅ NEW | |
| "MB (Megabucks)": "mb", | |
| "MM (Mega Millions)": "mm", | |
| "PB (Powerball)": "pb", | |
| } | |
| try: | |
| game_key = GAME_KEY_MAP[lotto_type] | |
| data_path = DATA_PATHS[lotto_type] | |
| cfg = GAME_CONFIGS[game_key] | |
| # --- CSV change detection + age indicator (sidebar, no layout impact) --- | |
| csv_file = str(Path(data_path)) | |
| with st.sidebar: | |
| age_s = _csv_age_seconds(csv_file) | |
| if age_s is None: | |
| st.caption("📄 CSV age: unknown") | |
| else: | |
| st.caption(f"📄 CSV age: {_human_age(age_s)} ago") | |
| changed, changed_list = csv_changed_warning([csv_file], label="Draw history CSV") | |
| # Soft restart button (clears caches + reloads this session) | |
| if st.button("🔁 Restart (soft reload)"): | |
| soft_restart_app() | |
| # Optional extra nudge if a change was detected | |
| if changed: | |
| st.info("Tip: Soft restart reloads the app session; for a full process restart, stop and re-run Streamlit.") | |
| # Optional: clear cached data if CSV changed | |
| if changed: | |
| try: | |
| st.cache_data.clear() | |
| except Exception: | |
| pass | |
| # Load dataset using V3.0 loader | |
| df, _ = load_csv_for_game(Path(data_path), game_key) | |
| # Hot/cold numbers computed using our helper | |
| hot, cold = get_hot_and_cold_numbers(df, cfg) | |
| hot_df = pd.DataFrame(hot, columns=["Number", "Frequency"]) | |
| cold_df = pd.DataFrame(cold, columns=["Number", "Frequency"]) | |
| # UI options - simplified for V3.0 | |
| run_backtest = st.checkbox("🧪 Run Backtest (slower but shows model performance)", value=False) | |
| use_wheel = st.checkbox("🎡 Generate Wheel Combinations (if wheel.txt available)", value=False) | |
| wheel_raw_text = load_wheel_raw_text(DATA_PATHS["wheel_template"]) if use_wheel else "" | |
| # Styling | |
| st.markdown( | |
| """ | |
| <style> | |
| table { margin-left:auto; margin-right:auto; } | |
| th, td { text-align:center !important; vertical-align: middle !important; } | |
| </style> | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| if st.button("🎰 Generate Prediction" if not run_backtest else "🧪 Run Backtest"): | |
| with st.spinner("Building ensemble models and generating results..."): | |
| # Run V3.0 predictor | |
| result = predict_for_game_v3( | |
| csv_path=Path(data_path), | |
| game_key=game_key, | |
| run_backtest=run_backtest | |
| ) | |
| if run_backtest: | |
| # Display backtest results | |
| if 'error' in result: | |
| st.error(f"❌ Backtest Error: {result['error']}") | |
| else: | |
| st.success("✅ Backtest Complete!") | |
| # Show summary metrics | |
| st.subheader("📊 Backtest Summary") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Model 3+ Matches", f"{result.get('model_3plus_rate', 0)}%") | |
| with col2: | |
| st.metric("Random 3+ Matches", f"{result.get('random_3plus_rate', 0)}%") | |
| with col3: | |
| st.metric("Even Count Accuracy", f"{result.get('even_count_accuracy', 0)}%") | |
| # Detailed hit rates | |
| st.subheader("🎯 Hit Rate Comparison") | |
| hit_data = [] | |
| for i in range(6): | |
| model_rate = result.get(f'model_hit_{i}_rate', 0) | |
| random_rate = result.get(f'random_hit_{i}_rate', 0) | |
| hit_data.append({ | |
| 'Matches': i, | |
| 'Model Rate (%)': model_rate, | |
| 'Random Rate (%)': random_rate, | |
| 'Improvement': f"+{model_rate - random_rate:.1f}%" if model_rate > random_rate else f"{model_rate - random_rate:.1f}%" | |
| }) | |
| hit_df = pd.DataFrame(hit_data) | |
| st.table(hit_df) | |
| # Raw results | |
| with st.expander("📋 Full Backtest Results"): | |
| st.json(result) | |
| else: | |
| # Display prediction results | |
| st.success(f"🧠 Predicted Numbers: {result['numbers']}") | |
| if result.get("star") is not None: | |
| star_col_name = cfg.star_col or 'Star' | |
| st.success(f"🌟 Predicted {star_col_name}: {result['star']}") | |
| else: | |
| st.info("ℹ️ No bonus number for this game") | |
| # Show additional info | |
| st.info(f"🔢 Total Sum: {sum(result['numbers'])} (Expected Range: {cfg.sum_min}–{cfg.sum_max})") | |
| model_info = result.get('model_info', {}) | |
| st.info(f"🤖 Models built for {model_info.get('numbers_modeled', 0)}/{model_info.get('total_possible', 0)} numbers") | |
| # ------------------------- | |
| # GOD MODE + CONSENSUS + ANCHORS (UI only) | |
| # ------------------------- | |
| god_sets = result.get("god_sets") or result.get("god_mode_sets") or result.get("godmode_sets") or [] | |
| strike = result.get("strike_tickets") or result.get("strike") or {} | |
| if god_sets: | |
| st.info("🎲 Lotto Cash Predictions") | |
| for s in god_sets: | |
| name = s.get("name", "set") | |
| nums = s.get("numbers") or [] | |
| st.info(f"{name}: {fmt_ticket({'numbers': nums, 'star': s.get('star', None)})}") | |
| # APPEND PATCH: EXTRA_PREDICTIONS_UI_V1 (APPEND-ONLY / NO DELETIONS) | |
| # Shows the two extra engine predictions (if present) without changing layout/colors. | |
| try: | |
| _extra = result.get("extra_predictions") | |
| if isinstance(_extra, dict) and (_extra.get("ml_ensemble") or _extra.get("dl_sequence")): | |
| st.info("➕ Extra Predictions") | |
| mlp = _extra.get("ml_ensemble") if isinstance(_extra.get("ml_ensemble"), dict) else None | |
| dlp = _extra.get("dl_sequence") if isinstance(_extra.get("dl_sequence"), dict) else None | |
| if mlp: | |
| st.info(f"ML Ensemble (RF/XGB): {fmt_ticket(mlp, cfg=cfg)}") | |
| if dlp: | |
| st.info(f"DL Sequence (LSTM/Transformer): {fmt_ticket(dlp, cfg=cfg)}") | |
| except Exception: | |
| pass | |
| collapse = None | |
| neighbor = None | |
| # Convergence tickets (engine may provide these directly) | |
| convergence_core = result.get("convergence_core") | |
| convergence_cooccur = result.get("convergence_cooccur") | |
| if isinstance(strike, dict): | |
| collapse = strike.get("collapse") or strike.get("consensus_collapse") or strike.get("CONSENSUS_COLLAPSE") | |
| neighbor = strike.get("neighbor") or strike.get("consensus_neighbor") or strike.get("CONSENSUS_NEIGHBOR") | |
| # If engine didn't supply these, allow them to come from strike payloads too | |
| if not convergence_core: | |
| convergence_core = strike.get("convergence_core") or strike.get("CONVERGENCE_CORE") | |
| if not convergence_cooccur: | |
| convergence_cooccur = strike.get("convergence_cooccur") or strike.get("CONVERGENCE_COOCCUR") or strike.get("CONVERGENCE_CO-OCCUR") | |
| if collapse or neighbor or convergence_core or convergence_cooccur: | |
| st.info("🎯 Consensus Tickets") | |
| if collapse: | |
| st.info(f"Consensus (Collapse): {fmt_ticket(collapse, cfg=cfg)}") | |
| if neighbor: | |
| st.info(f"Neighbor Consensus: {fmt_ticket(neighbor, cfg=cfg)}") | |
| if convergence_core: | |
| st.info(f"Convergence Core: {fmt_ticket(convergence_core, cfg=cfg)}") | |
| if convergence_cooccur: | |
| st.info(f"Convergence Co-Occur: {fmt_ticket(convergence_cooccur, cfg=cfg)}") | |
| main_min = int(getattr(cfg, "main_min", 1)) | |
| main_max = int(getattr(cfg, "main_max", 99)) | |
| seed = int(result.get("seed", 0) or 0) | |
| low_anchor = build_low_anchor(god_sets, main_min=main_min, main_max=main_max, seed=seed) | |
| mid_anchor = build_mid_anchor(god_sets, main_min=main_min, main_max=main_max, seed=seed) | |
| # Per-ticket Star Ball display (prevents the same star being shown for every line) | |
| def _pick_star_for_display(offset:int=0): | |
| try: | |
| if not getattr(cfg, "star_col", None): | |
| return None | |
| smin = int(getattr(cfg, "star_min", 1)) | |
| smax = int(getattr(cfg, "star_max", 1)) | |
| if smax < smin: | |
| return None | |
| rnd = random.Random(int(seed) + int(offset)) | |
| # Frequency weights: all-time + last-80 (recency boosted) | |
| series_all = pd.to_numeric(df[cfg.star_col], errors="coerce").dropna().astype(int).tolist() | |
| if not series_all: | |
| return rnd.randint(smin, smax) | |
| series_80 = pd.to_numeric(df[cfg.star_col].tail(80), errors="coerce").dropna().astype(int).tolist() | |
| from collections import Counter | |
| fa = Counter(series_all) | |
| f8 = Counter(series_80) | |
| candidates = list(range(smin, smax + 1)) | |
| weights = [] | |
| for b in candidates: | |
| w = 1.0 + float(fa.get(b, 0)) + 2.0 * float(f8.get(b, 0)) | |
| # gentle low-zone preference for LA / MM (matches engine intent) | |
| if getattr(cfg, "name", "") in ("Lotto America", "Mega Millions") and b <= (smin + 4): | |
| w *= 1.10 | |
| weights.append(w) | |
| return int(rnd.choices(candidates, weights=weights, k=1)[0]) | |
| except Exception: | |
| return None | |
| # ------------------------- | |
| # Recommended Plays (per-game, UI-only) | |
| # ------------------------- | |
| recommended = None | |
| if game_key == "gimme5": | |
| # If G5 play5 was built above, reuse it; else fall back to default recipe | |
| try: | |
| recommended = play5 if 'play5' in locals() else None | |
| except Exception: | |
| recommended = None | |
| if not recommended: | |
| recommended = _build_recommended_plays( | |
| game_key=game_key, | |
| primary_nums=result.get("numbers") or [], | |
| primary_star=result.get("star"), | |
| god_sets_list=god_sets, | |
| collapse_t=collapse, | |
| neighbor_t=neighbor, | |
| ) | |
| if recommended: | |
| st.info("🏷️ Recommended Plays (5-ticket recipe)") | |
| for lbl, tk in recommended: | |
| st.info(f"{lbl}: {fmt_ticket(tk, cfg=cfg)}") | |
| # ------------------------- | |
| # Optional: Results entry + hit highlight + rolling 30-draw scorecard (silent) | |
| # ------------------------- | |
| with st.expander("✅ Results & Hit Tracking (optional)"): | |
| results_input = st.text_input( | |
| "Paste official results (ex: 12-17-25-34-42 (9))", | |
| key=f"results_input_{game_key}", | |
| ) | |
| if st.button("Apply Results", key=f"apply_results_{game_key}"): | |
| main_set, star_val = _parse_results_text(results_input) | |
| if not main_set: | |
| st.warning("Couldn’t parse results. Example: 12-17-25-34-42 (9)") | |
| else: | |
| # Store last results in session | |
| st.session_state[f"last_results_{game_key}"] = {"main": sorted(main_set), "star": star_val, "ts": datetime.now().isoformat()} | |
| # Update rolling scorecard (30) | |
| sk = _scorecard_key(game_key) | |
| sc = st.session_state.get(sk, []) | |
| # Score recommended plays | |
| scored = [] | |
| for lbl, tk in (recommended or []): | |
| mh, sh = _calc_hits(tk, main_set, star_val, cfg=cfg) | |
| scored.append({"label": lbl, "main_hits": mh, "star_hit": sh}) | |
| sc.append({ | |
| "time": datetime.now().strftime('%Y-%m-%d %H:%M'), | |
| "results": "-".join(str(x) for x in sorted(main_set)) + (f" (⭐ {star_val})" if star_val is not None else ""), | |
| "best_main_hits": max([x["main_hits"] for x in scored], default=0), | |
| }) | |
| st.session_state[sk] = sc[-30:] | |
| st.success("Saved. Scroll down for highlights and scorecard.") | |
| # Show highlight if we have stored results | |
| stored = st.session_state.get(f"last_results_{game_key}") | |
| if stored: | |
| rmain = set(stored.get("main") or []) | |
| rstar = stored.get("star") | |
| st.write(f"**Last results:** {'-'.join(str(x) for x in sorted(rmain))}" + (f" (⭐ {rstar})" if rstar is not None else "")) | |
| if recommended: | |
| st.write("**Auto-highlight hits (Recommended Plays):**") | |
| for lbl, tk in recommended: | |
| mh, sh = _calc_hits(tk, rmain, rstar, cfg=cfg) | |
| hit_note = f"{mh} hit" + ("s" if mh != 1 else "") | |
| if sh is True: | |
| hit_note += " + ⭐" | |
| st.write(f"- {lbl}: {fmt_ticket(tk, cfg=cfg)} → {hit_note}") | |
| # Silent rolling 30-draw scorecard | |
| sk = _scorecard_key(game_key) | |
| sc = st.session_state.get(sk, []) | |
| if sc: | |
| st.write("**Rolling scorecard (last 30 saves):**") | |
| st.dataframe(pd.DataFrame(sc), use_container_width=True) | |
| st.info("🎟️ Anchor Tickets") | |
| st.info(f"LOW Anchor (1–9): {fmt_ticket({'numbers': low_anchor, 'star': _pick_star_for_display(101)})}") | |
| st.info(f"MID Anchor (10–19): {fmt_ticket({'numbers': mid_anchor, 'star': _pick_star_for_display(202)})}") | |
| lines = [] | |
| lines.append(f"GAME: {game_key}") | |
| lines.append(f"GENERATED: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| lines.append("") | |
| lines.append(f"PRIMARY: {fmt_ticket(result.get('numbers') or [], result.get('star'))}") | |
| if god_sets: | |
| lines.append("") | |
| lines.append("LOTTO CASH SETS:") | |
| for s in god_sets: | |
| lines.append(f"- {s.get('name','set')}: {fmt_ticket(s, cfg=cfg)}") | |
| if collapse or neighbor or (convergence_core is not None) or (convergence_cooccur is not None): | |
| lines.append("") | |
| lines.append("CONSENSUS:") | |
| if collapse: | |
| lines.append(f"- collapse: {fmt_ticket(collapse, cfg=cfg)}") | |
| if neighbor: | |
| lines.append(f"- neighbor: {fmt_ticket(neighbor, cfg=cfg)}") | |
| if convergence_core is not None: | |
| lines.append(f"- convergence_core: {fmt_ticket(convergence_core, cfg=cfg)}") | |
| if convergence_cooccur is not None: | |
| lines.append(f"- convergence_cooccur: {fmt_ticket(convergence_cooccur, cfg=cfg)}") | |
| lines.append("") | |
| lines.append(f"LOW ANCHOR (1–9): {fmt_ticket({'numbers': low_anchor, 'star': _pick_star_for_display(101)})}") | |
| lines.append(f"MID ANCHOR (10–19): {fmt_ticket({'numbers': mid_anchor, 'star': _pick_star_for_display(202)})}") | |
| content = "\n".join(lines).strip() + "\n" | |
| fname = f"{game_key}_tickets_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" | |
| st.download_button("⬇️ Download tickets (timestamped)", data=content.encode("utf-8"), | |
| file_name=fname, mime="text/plain") | |
| # ------------------------------------------------------------ | |
| # NITRO PACK download add-on (append-only; does not change layout) | |
| # Creates an additional download option that includes Nitro Pack tickets | |
| # if the engine provided them in the result payload. | |
| # ------------------------------------------------------------ | |
| try: | |
| nitro_pack = ( | |
| result.get("nitro_pack") | |
| or result.get("nitro") | |
| or result.get("nitro_pack_tickets") | |
| or result.get("nitro_tickets") | |
| ) | |
| if isinstance(nitro_pack, dict): | |
| # allow {"tickets":[...]} or {"sets":[...]} | |
| nitro_pack = nitro_pack.get("tickets") or nitro_pack.get("sets") or nitro_pack.get("pack") or [] | |
| if nitro_pack and isinstance(nitro_pack, (list, tuple)): | |
| lines2 = list(lines) | |
| lines2.append("") | |
| lines2.append("NITRO PACK:") | |
| for i_np, tk_np in enumerate(nitro_pack, start=1): | |
| try: | |
| lines2.append(f"- nitro_{i_np}: {fmt_ticket(tk_np, cfg=cfg)}") | |
| except Exception: | |
| # fall back if tk_np is just a list of numbers | |
| try: | |
| lines2.append(f"- nitro_{i_np}: {fmt_ticket({'numbers': tk_np, 'star': None}, cfg=cfg)}") | |
| except Exception: | |
| pass | |
| content2 = "\n".join(lines2).strip() + "\n" | |
| fname2 = f"{game_key}_tickets_NITRO_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" | |
| st.download_button( | |
| "⬇️ Download tickets + Nitro Pack", | |
| data=content2.encode("utf-8"), | |
| file_name=fname2, | |
| mime="text/plain", | |
| ) | |
| except Exception: | |
| pass | |
| # Wheel (if requested) | |
| if use_wheel and wheel_raw_text: | |
| # Build and show wheel table (A..T mapping) | |
| wheel_map = display_wheel_table_from_hotcold(hot_df, cold_df) | |
| # Expand numeric wheel.txt to letter-template and display combos | |
| display_wheel_combinations_from_raw(wheel_map, wheel_raw_text) | |
| elif use_wheel: | |
| st.warning("⚠️ Wheel template file (wheel.txt) not found or empty") | |
| except FileNotFoundError: | |
| st.error(f"❌ File not found: `{data_path}`") | |
| except Exception as e: | |
| st.error(f"⚠️ Error: {str(e)}") | |
| import traceback | |
| st.error(f"Details: {traceback.format_exc()}") | |
| # ============================================================ | |
| # APPEND PATCH: NITRO_PACK_UI_V2 (APPEND-ONLY / NO DELETIONS) | |
| # - Patches lotto_predictor.predict_for_game_v3 inside sys.modules so that | |
| # future Streamlit reruns import the wrapped function automatically. | |
| # - Stores last engine result in st.session_state. | |
| # - Renders 4 additional prediction lines below existing UI. | |
| # ============================================================ | |
| def _NITRO_UI__patch_engine_predictor(): | |
| try: | |
| import sys as _sys | |
| _mod = _sys.modules.get("lotto_predictor") | |
| if _mod is None: | |
| return | |
| _orig = getattr(_mod, "predict_for_game_v3", None) | |
| if _orig is None or getattr(_orig, "_nitro_wrapped", False): | |
| return | |
| def _wrapped_predict_for_game_v3(*args, **kwargs): | |
| res = _orig(*args, **kwargs) | |
| try: | |
| st.session_state["_nitro_last_result"] = res | |
| gk = None | |
| if "game_key" in kwargs: | |
| gk = kwargs.get("game_key") | |
| elif len(args) >= 2: | |
| gk = args[1] | |
| st.session_state["_nitro_last_game_key"] = gk | |
| except Exception: | |
| pass | |
| return res | |
| _wrapped_predict_for_game_v3._nitro_wrapped = True # type: ignore | |
| setattr(_mod, "predict_for_game_v3", _wrapped_predict_for_game_v3) | |
| except Exception: | |
| return | |
| def _NITRO_UI__get_ticket(res: dict, key: str): | |
| if not isinstance(res, dict): | |
| return None | |
| tk = res.get(key) | |
| if tk: | |
| return tk | |
| npk = res.get("nitro_pack") | |
| if isinstance(npk, dict): | |
| return npk.get(key) | |
| return None | |
| def _NITRO_UI__render(): | |
| res = st.session_state.get("_nitro_last_result") | |
| if not isinstance(res, dict) or ("error" in res): | |
| return | |
| gk = st.session_state.get("_nitro_last_game_key") | |
| try: | |
| if "game_key" in globals(): | |
| gk = globals().get("game_key") | |
| except Exception: | |
| pass | |
| cfg_local = None | |
| try: | |
| if gk and "GAME_CONFIGS" in globals(): | |
| cfg_local = GAME_CONFIGS.get(gk) | |
| except Exception: | |
| cfg_local = None | |
| mk = _NITRO_UI__get_ticket(res, "markov_addon") | |
| rf = _NITRO_UI__get_ticket(res, "rf_line") | |
| jc = _NITRO_UI__get_ticket(res, "jackpot_chase") | |
| st.info("⚡ Nitro Pack (4 extra lines)") | |
| st.info(f"🔁 Markov Add-on: {fmt_ticket(mk, cfg=cfg_local) if mk else '(not available for this run)'}") | |
| st.info(f"🌲 RF-only ML Line: {fmt_ticket(rf, cfg=cfg_local) if rf else '(not available for this run)'}") | |
| if jc: | |
| label = None | |
| try: | |
| label = jc.get("mode") | |
| except Exception: | |
| label = None | |
| prefix = "🚀 Jackpot Chase Mode (optional)" if not label else f"🚀 {label}" | |
| st.info(f"{prefix}: {fmt_ticket(jc, cfg=cfg_local)}") | |
| else: | |
| st.info("🚀 Jackpot Chase Mode (optional): (not available for this run)") | |
| try: | |
| pasted = "" | |
| if gk: | |
| pasted = st.session_state.get(f"results_input_{gk}", "") | |
| main_set, _star = _parse_results_text(pasted) | |
| if main_set: | |
| engine_primary = {"numbers": res.get("numbers") or [], "star": res.get("star")} | |
| eng_hits = len(_ticket_main_set(engine_primary) & main_set) | |
| rf_hits = len(_ticket_main_set(rf) & main_set) if rf else 0 | |
| mk_hits = len(_ticket_main_set(mk) & main_set) if mk else 0 | |
| jc_hits = len(_ticket_main_set(jc) & main_set) if jc else 0 | |
| st.info(f"⚖️ Live compare vs pasted results: Engine={eng_hits}/5 • RF={rf_hits}/5 • Markov={mk_hits}/5 • Chase={jc_hits}/5") | |
| else: | |
| st.info("⚖️ Live compare (paste official results to score Engine vs RF vs Markov vs Chase)") | |
| except Exception: | |
| st.info("⚖️ Live compare (paste official results to score Engine vs RF vs Markov vs Chase)") | |
| # Patch engine module for future reruns, then render any stored output | |
| try: | |
| _NITRO_UI__patch_engine_predictor() | |
| except Exception: | |
| pass | |
| try: | |
| _NITRO_UI__render() | |
| except Exception: | |
| pass | |
| # ========================= | |
| # NITRO PATCH v3 (append-only) | |
| # Adds an explicit checkbox/tickbox for Live Compare display. | |
| # Does NOT change any layout above; this renders at the very bottom. | |
| # ========================= | |
| def _NITRO_UI__live_compare_checkbox_panel(): | |
| try: | |
| import streamlit as st | |
| except Exception: | |
| return | |
| try: | |
| # Show a small optional panel at the bottom so user has a clear tickbox. | |
| with st.expander("⚖️ Live Compare (optional)", expanded=False): | |
| enabled = st.checkbox( | |
| "Enable Live compare (paste official results to score Engine vs RF vs Markov vs Chase)", | |
| value=False, | |
| key="nitro_live_compare_enabled", | |
| help="If enabled, this panel will show the live hit counts based on the last saved results.", | |
| ) | |
| st.caption("Tip: Use the existing '✅ Results & Hit Tracking (optional)' expander above, paste official results, then click 'Apply Results'.") | |
| if not enabled: | |
| return | |
| # Try to infer the active game_key from session by looking for a last_results_* entry. | |
| # This is safe and does not require changing any code above. | |
| last_keys = [k for k in st.session_state.keys() if isinstance(k, str) and k.startswith("last_results_")] | |
| if not last_keys: | |
| st.info("No saved results yet. Paste official results above and click 'Apply Results' first.") | |
| return | |
| # Prefer the most recently updated key if multiple exist. | |
| def _ts(k): | |
| try: | |
| v = st.session_state.get(k) or {} | |
| return v.get("ts") or "" | |
| except Exception: | |
| return "" | |
| last_keys_sorted = sorted(last_keys, key=_ts, reverse=True) | |
| game_key = last_keys_sorted[0].replace("last_results_", "", 1) | |
| stored = st.session_state.get(f"last_results_{game_key}") or {} | |
| main = set(stored.get("main") or []) | |
| if not main: | |
| st.info("Saved results found but could not parse main numbers.") | |
| return | |
| # Try to fetch the last predictions we rendered for this game from session (if present), | |
| # else do a best-effort pull from commonly used session keys. | |
| # We will display 'N/A' if we can't find them; still satisfies checkbox presence. | |
| pred = st.session_state.get(f"nitro_last_predictions_{game_key}") or st.session_state.get("nitro_last_predictions") or {} | |
| eng = pred.get("engine") or pred.get("primary") or pred.get("balanced") | |
| rf = pred.get("rf") | |
| mk = pred.get("markov") | |
| jc = pred.get("chase") or pred.get("jackpot_chase") | |
| def _main_set(tk): | |
| try: | |
| if tk is None: | |
| return set() | |
| if isinstance(tk, dict): | |
| nums = tk.get("numbers") or tk.get("nums") or tk.get("main") or [] | |
| elif isinstance(tk, (list, tuple)): | |
| nums = list(tk) | |
| else: | |
| nums = [int(x) for x in re.findall(r"\d+", str(tk))][:5] | |
| return set(int(x) for x in nums if str(x).strip() != "") | |
| except Exception: | |
| return set() | |
| eng_hits = len(_main_set(eng) & main) if eng else None | |
| rf_hits = len(_main_set(rf) & main) if rf else None | |
| mk_hits = len(_main_set(mk) & main) if mk else None | |
| jc_hits = len(_main_set(jc) & main) if jc else None | |
| # Display in a clear card-like info line | |
| parts = [] | |
| parts.append(f"Engine={'N/A' if eng_hits is None else str(eng_hits) + '/5'}") | |
| parts.append(f"RF={'N/A' if rf_hits is None else str(rf_hits) + '/5'}") | |
| parts.append(f"Markov={'N/A' if mk_hits is None else str(mk_hits) + '/5'}") | |
| parts.append(f"Chase={'N/A' if jc_hits is None else str(jc_hits) + '/5'}") | |
| st.success(" • ".join(parts)) | |
| st.caption("If any are N/A, it means that line wasn't cached in session for this run (the main UI line still works).") | |
| except Exception: | |
| # Never break the app | |
| return | |
| # Render the checkbox panel last | |
| try: | |
| _NITRO_UI__live_compare_checkbox_panel() | |
| except Exception: | |
| pass | |
| # ====================================================================================== | |
| # NITRO PACK V4 ADD-ON (append-only) | |
| # - Performance memory: keep last N scored results for Engine/RF/Markov/Chase | |
| # - Auto Chase helper: optional checkbox + additional download that includes | |
| # a "recommended" bundle when the engine has been cold | |
| # - RF confidence display (if provided by engine) | |
| # NOTE: This block is strictly append-only and does not modify any code above. | |
| # ====================================================================================== | |
| def _nitro__safe_fmt_nums(nums): | |
| try: | |
| if nums is None: | |
| return "" | |
| if isinstance(nums, dict): | |
| nums = nums.get("numbers") or nums.get("nums") or nums.get("main") or [] | |
| if isinstance(nums, (tuple, list)): | |
| return "-".join(str(int(x)) for x in nums) | |
| return "-".join(re.findall(r"\d+", str(nums))[:5]) | |
| except Exception: | |
| return "" | |
| def _nitro__hits(pred_ticket, result_main_set): | |
| try: | |
| if pred_ticket is None: | |
| return None | |
| if isinstance(pred_ticket, dict): | |
| nums = pred_ticket.get("numbers") or pred_ticket.get("nums") or pred_ticket.get("main") or [] | |
| elif isinstance(pred_ticket, (list, tuple)): | |
| nums = list(pred_ticket) | |
| else: | |
| nums = [int(x) for x in re.findall(r"\d+", str(pred_ticket))][:5] | |
| s = set(int(x) for x in nums if str(x).strip() != "") | |
| return int(len(s & set(result_main_set))) | |
| except Exception: | |
| return None | |
| def _NITRO_UI__cache_last_predictions(): | |
| """Cache the last rendered predictions so Live Compare + performance scoring can work.""" | |
| try: | |
| res = st.session_state.get("_nitro_last_result") | |
| gk = st.session_state.get("_nitro_last_game_key") | |
| if not isinstance(res, dict) or not gk: | |
| return | |
| # Extract best-known tickets | |
| engine_ticket = ( | |
| res.get("primary") | |
| or (res.get("god_mode", {}) or {}).get("balanced") | |
| or (res.get("god_mode_sets", {}) or {}).get("balanced") | |
| ) | |
| nitro_pack = res.get("nitro_pack") if isinstance(res.get("nitro_pack"), dict) else {} | |
| mk = res.get("markov_addon") or (nitro_pack.get("markov_addon") if isinstance(nitro_pack, dict) else None) | |
| rf = res.get("rf_line") or (nitro_pack.get("rf_line") if isinstance(nitro_pack, dict) else None) | |
| jc = res.get("jackpot_chase") or (nitro_pack.get("jackpot_chase") if isinstance(nitro_pack, dict) else None) | |
| st.session_state[f"nitro_last_predictions_{gk}"] = { | |
| "engine": engine_ticket, | |
| "rf": rf, | |
| "markov": mk, | |
| "chase": jc, | |
| "rf_confidence": res.get("rf_confidence") or (rf.get("confidence") if isinstance(rf, dict) else None), | |
| "ts": datetime.now().isoformat(), | |
| } | |
| # Also keep a generic last snapshot for safety | |
| st.session_state["nitro_last_predictions"] = st.session_state[f"nitro_last_predictions_{gk}"] | |
| except Exception: | |
| return | |
| def _NITRO_UI__score_if_new_results(): | |
| """If the user saved official results, score the cached lines and append to history.""" | |
| try: | |
| last_keys = [k for k in st.session_state.keys() if isinstance(k, str) and k.startswith("last_results_")] | |
| if not last_keys: | |
| return | |
| def _ts(k): | |
| try: | |
| v = st.session_state.get(k) or {} | |
| return v.get("ts") or "" | |
| except Exception: | |
| return "" | |
| last_key = sorted(last_keys, key=_ts, reverse=True)[0] | |
| gk = last_key.replace("last_results_", "", 1) | |
| stored = st.session_state.get(last_key) or {} | |
| main = stored.get("main") or [] | |
| rts = stored.get("ts") or "" | |
| if not main or not rts: | |
| return | |
| pred = st.session_state.get(f"nitro_last_predictions_{gk}") or {} | |
| scored_key = f"nitro_last_scored_ts_{gk}" | |
| if st.session_state.get(scored_key) == rts: | |
| return | |
| entry = { | |
| "ts": rts, | |
| "engine": _nitro__hits(pred.get("engine"), main), | |
| "rf": _nitro__hits(pred.get("rf"), main), | |
| "markov": _nitro__hits(pred.get("markov"), main), | |
| "chase": _nitro__hits(pred.get("chase"), main), | |
| } | |
| hist_key = f"nitro_score_history_{gk}" | |
| hist = st.session_state.get(hist_key) or [] | |
| if not isinstance(hist, list): | |
| hist = [] | |
| hist.append(entry) | |
| st.session_state[hist_key] = hist[-12:] | |
| st.session_state[scored_key] = rts | |
| except Exception: | |
| return | |
| def _NITRO_UI__performance_panel(): | |
| try: | |
| _NITRO_UI__cache_last_predictions() | |
| _NITRO_UI__score_if_new_results() | |
| gk = st.session_state.get("_nitro_last_game_key") | |
| if not gk: | |
| hist_keys = [k for k in st.session_state.keys() if isinstance(k, str) and k.startswith("nitro_score_history_")] | |
| if not hist_keys: | |
| return | |
| gk = hist_keys[-1].replace("nitro_score_history_", "", 1) | |
| hist = st.session_state.get(f"nitro_score_history_{gk}") or [] | |
| if not isinstance(hist, list): | |
| hist = [] | |
| with st.expander("📈 Nitro Pack Performance (optional)", expanded=False): | |
| st.caption("Scores update automatically after you paste official results and click 'Apply Results'.") | |
| if not hist: | |
| st.info("No scored results yet. Paste official results above and click 'Apply Results' to start tracking.") | |
| return | |
| def _avg(field): | |
| vals = [h.get(field) for h in hist if isinstance(h.get(field), int)] | |
| return (sum(vals) / len(vals)) if vals else None | |
| a_eng = _avg("engine") | |
| a_rf = _avg("rf") | |
| a_mk = _avg("markov") | |
| a_jc = _avg("chase") | |
| st.success( | |
| "Averages (last {}): Engine={} RF={} Markov={} Chase={}".format( | |
| len(hist), | |
| "N/A" if a_eng is None else f"{a_eng:.2f}/5", | |
| "N/A" if a_rf is None else f"{a_rf:.2f}/5", | |
| "N/A" if a_mk is None else f"{a_mk:.2f}/5", | |
| "N/A" if a_jc is None else f"{a_jc:.2f}/5", | |
| ) | |
| ) | |
| pred = st.session_state.get(f"nitro_last_predictions_{gk}") or {} | |
| rf_conf = pred.get("rf_confidence") | |
| if rf_conf is not None: | |
| try: | |
| st.info(f"RF Confidence: {float(rf_conf):.1f}% (heuristic from RF probabilities)") | |
| except Exception: | |
| st.info(f"RF Confidence: {rf_conf}") | |
| for h in reversed(hist[-10:]): | |
| st.write( | |
| f"{h.get('ts','')[:19]} • Engine={h.get('engine','N/A')}/5 • RF={h.get('rf','N/A')}/5 • Markov={h.get('markov','N/A')}/5 • Chase={h.get('chase','N/A')}/5" | |
| ) | |
| cold = (a_eng is not None and a_eng < 1.0) | |
| auto = st.checkbox( | |
| "🚀 Auto-suggest Jackpot Chase when Engine is cold (avg < 1.0 hit)", | |
| value=bool(st.session_state.get("nitro_auto_chase", False)), | |
| key="nitro_auto_chase", | |
| help="Does not change your main output; it only adds a helper suggestion + a separate download option below.", | |
| ) | |
| if auto and cold: | |
| st.warning("Engine has been cold recently. Jackpot Chase is suggested for upside (optional).") | |
| try: | |
| pred2 = st.session_state.get(f"nitro_last_predictions_{gk}") or {} | |
| lines = [] | |
| lines.append(f"GAME: {gk}") | |
| lines.append(f"GENERATED: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| lines.append("") | |
| lines.append(f"ENGINE: {_nitro__safe_fmt_nums(pred2.get('engine'))}") | |
| lines.append(f"MARKOV: {_nitro__safe_fmt_nums(pred2.get('markov'))}") | |
| lines.append(f"RF: {_nitro__safe_fmt_nums(pred2.get('rf'))}") | |
| lines.append(f"CHASE: {_nitro__safe_fmt_nums(pred2.get('chase'))}") | |
| lines.append("") | |
| if auto and cold: | |
| lines.append("RECOMMENDED: ENGINE + CHASE (Engine cold; Chase optional)") | |
| content = "\n".join(lines).strip() + "\n" | |
| st.download_button( | |
| "⬇️ Download Nitro Lines (Engine/RF/Markov/Chase)", | |
| data=content.encode("utf-8"), | |
| file_name=f"{gk}_nitro_lines_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt", | |
| mime="text/plain", | |
| ) | |
| except Exception: | |
| pass | |
| except Exception: | |
| return | |
| try: | |
| _NITRO_UI__performance_panel() | |
| except Exception: | |
| pass | |