import streamlit as st import pandas as pd from pathlib import Path import re import json from collections import Counter from datetime import datetime import random import os import time # ------------------------- # CSV freshness guardrails (UI-only, no layout/color changes) # ------------------------- def _human_age(seconds: float) -> str: try: s = int(max(0, seconds)) except Exception: return "unknown" if s < 60: return f"{s}s" m, s = divmod(s, 60) if m < 60: return f"{m}m {s}s" h, m = divmod(m, 60) if h < 24: return f"{h}h {m}m" d, h = divmod(h, 24) return f"{d}d {h}h" def _csv_signature(path: str): """Fast CSV signature: (exists, mtime_ns, size).""" try: stat = os.stat(path) return (True, stat.st_mtime_ns, stat.st_size) except FileNotFoundError: return (False, None, None) except Exception: return (False, None, None) def _csv_age_seconds(path: str): try: stat = os.stat(path) return max(0.0, time.time() - float(stat.st_mtime)) except Exception: return None def csv_changed_warning(csv_paths, label="CSV"): """ Show a warning if any CSV changed since last run (session_state). Returns (changed_any, changed_list). """ if "csv_sigs" not in st.session_state: st.session_state["csv_sigs"] = {} changed_any = False changed_list = [] for p in csv_paths: sig = _csv_signature(p) old = st.session_state["csv_sigs"].get(p) if old is None: # First time this session — just store it st.session_state["csv_sigs"][p] = sig continue if sig != old: changed_any = True changed_list.append(p) st.session_state["csv_sigs"][p] = sig if changed_any: st.warning( f"⚠️ {label} updated since last run. " f"Soft restart recommended for clean recency windows.\n\n" + "\n".join(f"• {os.path.basename(x)}" for x in changed_list) ) return changed_any, changed_list def soft_restart_app(): """Best-effort 'restart' inside Streamlit: clears caches + session state then reruns.""" try: st.cache_data.clear() except Exception: pass try: # Clear all session state keys (widgets will reinitialize) for k in list(st.session_state.keys()): del st.session_state[k] except Exception: pass try: st.rerun() except Exception: pass # --- Gimme5 Default Mode (UI-only, safe) --- g5_default_mode = False def fmt_ticket(nums, star=None, cfg=None): """Format a ticket for display. Accepts: - list/tuple of numbers - dict with keys like {'numbers':[...], 'star':...} (or bonus/power) - pre-formatted string Clamps bonus range using cfg.star_min/star_max when cfg is provided. """ if nums is None: return "" # If dict ticket, prefer its own star/bonus/power unless an explicit star was provided if isinstance(nums, dict): if star is None: for k in ("star", "bonus", "power", "pb", "mb", "sb", "lucky"): if k in nums and isinstance(nums.get(k), (int, str)): star = nums.get(k) break nums = nums.get("numbers") or nums.get("nums") or nums.get("main") or [] # If already a string (rare but supported), just attach star if valid if isinstance(nums, str): s = nums.strip() if not s: return "" if star is not None: try: st = int(star) if cfg is not None and getattr(cfg, "star_min", None) is not None and getattr(cfg, "star_max", None) is not None: if not (int(cfg.star_min) <= st <= int(cfg.star_max)): st = None if st is not None: return f"{s} (⭐ {st})" except Exception: pass return s # Normalize numbers list clean = [] for x in (nums or []): try: clean.append(int(x)) except Exception: continue # Clamp main range if cfg provided (safe-guard) if cfg is not None and getattr(cfg, "main_min", None) is not None and getattr(cfg, "main_max", None) is not None: try: mn, mx = int(cfg.main_min), int(cfg.main_max) clean = [n for n in clean if mn <= n <= mx] except Exception: pass s = "-".join(str(n) for n in clean) # Attach bonus ball if present (and in-range for this game) if star is not None and s: try: st = int(star) if cfg is not None and getattr(cfg, "star_min", None) is not None and getattr(cfg, "star_max", None) is not None: if not (int(cfg.star_min) <= st <= int(cfg.star_max)): st = None if st is not None: return f"{s} (⭐ {st})" except Exception: pass return s def _is_bad_run(nums, run_len=4): s = sorted(set(int(x) for x in nums)) streak = 1 for i in range(1, len(s)): if s[i] == s[i-1] + 1: streak += 1 if streak >= run_len: return True else: streak = 1 return False def build_anchor_spread_ticket(god_sets, *, main_min:int, main_max:int, anchor_min:int, anchor_max:int, seed:int=0): from collections import Counter rnd = random.Random(int(seed) if seed is not None else 0) pool = [] for s in (god_sets or []): for n in (s.get("numbers") or []): try: n = int(n) if main_min <= n <= main_max: pool.append(n) except Exception: pass freq = Counter(pool) core = set([n for n, _ in freq.most_common(7)]) a0 = max(main_min, anchor_min) a1 = min(main_max, anchor_max) if a0 > a1: return None anchor_band = list(range(a0, a1 + 1)) anchor_choices = sorted(anchor_band, key=lambda n: (freq.get(n, 0), n)) anchor = next((n for n in anchor_choices if n not in core), anchor_choices[0]) rest = sorted(set(pool), key=lambda n: (freq.get(n, 0), n)) rest = [n for n in rest if n not in core and n != anchor] if len(rest) < 10: rest = [n for n in range(main_min, main_max + 1) if n not in core and n != anchor] mid = [n for n in rest if 10 <= n <= 19] hi = [n for n in rest if n >= 20] any10 = [n for n in rest if n >= 10] picked = [anchor] def pick_from(band, k): band2 = [n for n in band if n not in picked] rnd.shuffle(band2) for n in band2: picked.append(n) if len(picked) >= 1 + k: break pick_from(mid, 2) remaining = 5 - len(picked) if remaining > 0: pick_from(hi, remaining) if len(picked) < 5: pick_from(any10, 5 - len(picked)) if len(picked) < 5: fb = [n for n in range(main_min, main_max + 1) if n not in picked] rnd.shuffle(fb) for n in fb: picked.append(n) if len(picked) >= 5: break picked = sorted(picked[:5]) if _is_bad_run(picked, run_len=4): fb = [n for n in range(main_min, main_max + 1) if n not in picked and n not in core] fb = sorted(fb, reverse=True) for swap_in in fb[:40]: for idx in range(1, 5): trial = sorted(picked[:idx] + [swap_in] + picked[idx+1:]) if not _is_bad_run(trial, run_len=4): return trial return picked def build_low_anchor(god_sets, *, main_min:int, main_max:int, seed:int=0): return build_anchor_spread_ticket(god_sets, main_min=main_min, main_max=main_max, anchor_min=main_min, anchor_max=min(9, main_max), seed=seed) def build_mid_anchor(god_sets, *, main_min:int, main_max:int, seed:int=0): return build_anchor_spread_ticket(god_sets, main_min=main_min, main_max=main_max, anchor_min=max(10, main_min), anchor_max=min(19, main_max), seed=seed) # Import V3.0 backend from lotto_predictor import ( predict_for_game_v3, GAME_CONFIGS, NumpyEncoder, clean_powerball_df, load_csv_for_game ) # Data paths (adjust if your files live in a data/ subfolder) DATA_PATHS = { "G5 (Gimme 5)": "gimme5_results.csv", "LA (Lotto America)": "la_results.csv", "L4L (Lucky for Life)": "Lucky For Life.csv", # ✅ NEW "MB (Megabucks)": "mb_results.csv", "MM (Mega Millions)": "mm_results.csv", "PB (Powerball)": "pb_results.csv", "wheel_template": "wheel.txt", } st.set_page_config(page_title="Multi Lotto AI Engine V6.0", layout="centered") st.title("🎯 Lotto AI Engine (V6.0)") # ------------------------- # Helper functions for V3.0 # ------------------------- def get_hot_and_cold_numbers(df: pd.DataFrame, cfg, top_n: int = 10): """Calculate hot and cold numbers from the dataframe""" # Count frequency of each number across all main columns all_numbers = [] for col in cfg.main_cols: all_numbers.extend(df[col].astype(int).tolist()) freq_counter = Counter(all_numbers) # Get all possible numbers for this game all_possible = list(range(cfg.main_min, cfg.main_max + 1)) # Create frequency list with zeros for missing numbers freq_list = [(num, freq_counter.get(num, 0)) for num in all_possible] # Sort by frequency sorted_by_freq = sorted(freq_list, key=lambda x: x[1], reverse=True) # Hot numbers (most frequent) hot = sorted_by_freq[:top_n] # Cold numbers (least frequent) cold = sorted_by_freq[-top_n:] cold.reverse() # Show coldest first return hot, cold @st.cache_data def load_wheel_raw_text(path: str) -> str: """ Read wheel template as raw text using latin-1 fallback (robust to special bytes). Returns empty string if missing or unreadable. """ p = Path(path) if not p.exists(): return "" try: # latin-1 will never fail for single-byte encodings; errors='replace' for safety text = p.read_text(encoding="latin-1", errors="replace") return text except Exception: return "" def select_20_wheel_numbers(hot: list, cold: list): """Select 20 numbers for wheeling using hot/cold analysis""" wheel_map = {} wheel_labels = list("ABCDEFGHIJKLMNOPQRST") # Take top 10 hot numbers hot_numbers = [num for num, freq in hot[:10]] # Take bottom 10 cold numbers cold_numbers = [num for num, freq in cold[:10]] # Combine them selected_numbers = hot_numbers + cold_numbers # Map to letters A-T for i, letter in enumerate(wheel_labels): if i < len(selected_numbers): wheel_map[letter] = selected_numbers[i] return wheel_map def convert_numeric_wheel_to_letter_template(raw_text: str, wheel_size: int = 20) -> str: """ Convert numeric wheel lines like: 1-01-02-06-18-19-46 into letter-template lines: A B C D E """ if not raw_text: return "" lines = raw_text.splitlines() out_lines = [] for line in lines: # Detect lines that start with an index + dash (e.g. " 1-01-02-06-18-19-46") if re.match(r'^\s*\d+\s*-', line): # extract all integer tokens (1 or 2 digits) nums = re.findall(r'\d{1,2}', line) if not nums: continue # Many files start the line with the ticket index; drop it if present and equals first num first_num_match = re.match(r'^\s*(\d+)', line) if first_num_match and nums and nums[0] == first_num_match.group(1): nums = nums[1:] if len(nums) < 5: # skip if fewer than 5 picks found continue picks = nums[:5] # take the first five numbers letters = [] for n_str in picks: n = int(n_str) # Map 1->A, 2->B, ... wrap/clamp if needed idx = (n - 1) % 26 letters.append(chr(ord('A') + idx)) if len(letters) >= 5: out_lines.append(" ".join(letters)) return "\n".join(out_lines) def expand_wheel_with_template(wheel_map: dict, template: str): """Expand wheel template into actual number combinations""" combos = [] lines = template.strip().split('\n') for line in lines: letters = line.strip().split() if len(letters) >= 5: combo = [] for letter in letters[:5]: # Take first 5 letters if letter in wheel_map: combo.append(wheel_map[letter]) if len(combo) == 5: combos.append(sorted(combo)) return combos # ------------------------- # Display helpers # ------------------------- def display_hot_cold_tables(hot_df: pd.DataFrame, cold_df: pd.DataFrame): hot_df.index = range(1, len(hot_df) + 1) hot_df.index.name = "No" cold_df.index = range(1, len(cold_df) + 1) cold_df.index.name = "No" with st.expander("🔥 Hot Numbers (Top 10)"): st.table(hot_df) with st.expander("❄️ Cold Numbers (Bottom 10)"): st.table(cold_df) def display_wheel_table_from_hotcold(hot_df: pd.DataFrame, cold_df: pd.DataFrame): """ Build the 20-number wheel mapping and show the table in the UI. Returns wheel_map (dict letter->number). """ hot = [(int(n), f) for n, f in hot_df.values] cold = [(int(n), f) for n, f in cold_df.values] wheel_map = select_20_wheel_numbers(hot, cold) wheel_labels = list("ABCDEFGHIJKLMNOPQRST") ordered_numbers = [wheel_map.get(l, None) for l in wheel_labels] wheel_df = pd.DataFrame([ordered_numbers], columns=wheel_labels) with st.expander("🎡 Your 20 Numbers to Wheel"): st.table(wheel_df) return wheel_map def display_wheel_combinations_from_raw(wheel_map: dict, raw_template_text: str): """ Convert numeric wheel template to letter-template, then expand and display combos. """ if not raw_template_text: st.warning("Wheel template file is empty or not found.") return letter_template = convert_numeric_wheel_to_letter_template(raw_template_text) if not letter_template: st.warning("Wheel template parsing found no valid ticket lines.") return combos = expand_wheel_with_template(wheel_map, letter_template) if not combos: st.warning("No combinations produced after expansion.") return df = pd.DataFrame(combos, columns=["Num1", "Num2", "Num3", "Num4", "Num5"]) df.index = [f"Ticket{i+1}" for i in range(len(df))] df.index.name = "No" with st.expander(f"🎟️ Wheel Combinations ({len(df)} tickets)"): st.dataframe(df) # ------------------------- # ------------------------- # Results parsing + hit tracking (UI-only, no layout/color changes) # ------------------------- def _parse_results_text(txt: str): """Parse results like '1-2-3-4-5 (6)' or '1 2 3 4 5 ⭐6'. Returns (main_set, star_int_or_None).""" if not txt: return set(), None s = str(txt).strip() if not s: return set(), None # Extract numbers; first 5 are main, last (optional) treated as star/bonus if present nums = re.findall(r"\d+", s) if len(nums) < 5: return set(), None main = [int(x) for x in nums[:5]] star = None if len(nums) >= 6: try: star = int(nums[5]) except Exception: star = None return set(main), star def _ticket_main_set(tk): """Return main numbers set from a ticket dict/list/string.""" if tk is None: return set() if isinstance(tk, dict): cand = tk.get('numbers') or tk.get('nums') or tk.get('main') or [] return set(int(x) for x in cand if str(x).isdigit() or isinstance(x, int)) if isinstance(tk, (list, tuple, set)): return set(int(x) for x in tk if str(x).isdigit() or isinstance(x, int)) if isinstance(tk, str): nums = re.findall(r"\d+", tk) return set(int(x) for x in nums[:5]) if len(nums) >= 5 else set() return set() def _ticket_star(tk): if isinstance(tk, dict): for k in ('star','bonus','power','pb','mb','sb','lucky'): if k in tk and tk.get(k) is not None: try: return int(tk.get(k)) except Exception: return None return None def _calc_hits(ticket, results_main:set, results_star, cfg=None): main = _ticket_main_set(ticket) main_hits = len(main & (results_main or set())) star_hit = None if results_star is not None: stv = _ticket_star(ticket) if stv is None and isinstance(ticket, dict): # fmt_ticket supports pulling star from dict; mirror that behavior loosely stv = ticket.get('star') if isinstance(ticket.get('star'), (int,str)) else None try: stv = int(stv) if stv is not None else None except Exception: stv = None if stv is not None: # Clamp against cfg star range (safety) if cfg is not None and getattr(cfg,'star_min',None) is not None and getattr(cfg,'star_max',None) is not None: try: smin, smax = int(cfg.star_min), int(cfg.star_max) if not (smin <= int(stv) <= smax): stv = None except Exception: pass star_hit = (stv == results_star) if stv is not None else None return main_hits, star_hit def _get_named_ticket(god_sets_list, wanted): w = (wanted or '').strip().lower() for _s in (god_sets_list or []): nm = (_s.get('style') or _s.get('name') or '').strip().lower() if nm == w: return _s return None def _build_recommended_plays(*, game_key:str, primary_nums, primary_star, god_sets_list, collapse_t, neighbor_t): """Default 5-ticket recipe per game (UI-only): balanced + tight + wide + collapse + neighbor.""" picks = [] # Prefer the engine-provided 'balanced' style; fall back to PRIMARY balanced = _get_named_ticket(god_sets_list, 'balanced') if balanced: picks.append(('balanced', balanced)) else: picks.append(('primary', {'numbers': primary_nums or [], 'star': primary_star})) tight = _get_named_ticket(god_sets_list, 'tight_cluster') if tight: picks.append(('tight_cluster', tight)) wide = _get_named_ticket(god_sets_list, 'wide_spread') if wide: picks.append(('wide_spread', wide)) if collapse_t: picks.append(('collapse', collapse_t)) if neighbor_t: picks.append(('neighbor', neighbor_t)) # Fill to 5 from remaining god sets (keeps variety) if len(picks) < 5: used = set() for lbl, tk in picks: used.add(tuple(sorted(_ticket_main_set(tk)))) for s in (god_sets_list or []): key = tuple(sorted(_ticket_main_set(s))) if key and key not in used: picks.append((s.get('style') or s.get('name') or 'set', s)) used.add(key) if len(picks) >= 5: break # Final de-dupe + cap seen = set() out = [] for lbl, tk in picks: key = tuple(sorted(_ticket_main_set(tk))) if not key or key in seen: continue seen.add(key) out.append((lbl, tk)) if len(out) >= 5: break return out def _scorecard_key(game_key: str): return f"scorecard_{game_key}" # UI & main logic # ------------------------- lotto_options = [ "G5 (Gimme 5)", "LA (Lotto America)", "L4L (Lucky for Life)", # ✅ NEW "MB (Megabucks)", "MM (Mega Millions)", "PB (Powerball)", ] lotto_type = st.selectbox("Select Lotto Type:", options=lotto_options, index=0) # Map display names -> keys used in GAME_CONFIGS and predict_for_game_v3 GAME_KEY_MAP = { "G5 (Gimme 5)": "gimme5", "LA (Lotto America)": "la", "L4L (Lucky for Life)": "l4l", # ✅ NEW "MB (Megabucks)": "mb", "MM (Mega Millions)": "mm", "PB (Powerball)": "pb", } try: game_key = GAME_KEY_MAP[lotto_type] data_path = DATA_PATHS[lotto_type] cfg = GAME_CONFIGS[game_key] # --- CSV change detection + age indicator (sidebar, no layout impact) --- csv_file = str(Path(data_path)) with st.sidebar: age_s = _csv_age_seconds(csv_file) if age_s is None: st.caption("📄 CSV age: unknown") else: st.caption(f"📄 CSV age: {_human_age(age_s)} ago") changed, changed_list = csv_changed_warning([csv_file], label="Draw history CSV") # Soft restart button (clears caches + reloads this session) if st.button("🔁 Restart (soft reload)"): soft_restart_app() # Optional extra nudge if a change was detected if changed: st.info("Tip: Soft restart reloads the app session; for a full process restart, stop and re-run Streamlit.") # Optional: clear cached data if CSV changed if changed: try: st.cache_data.clear() except Exception: pass # Load dataset using V3.0 loader df, _ = load_csv_for_game(Path(data_path), game_key) # Hot/cold numbers computed using our helper hot, cold = get_hot_and_cold_numbers(df, cfg) hot_df = pd.DataFrame(hot, columns=["Number", "Frequency"]) cold_df = pd.DataFrame(cold, columns=["Number", "Frequency"]) # UI options - simplified for V3.0 run_backtest = st.checkbox("🧪 Run Backtest (slower but shows model performance)", value=False) use_wheel = st.checkbox("🎡 Generate Wheel Combinations (if wheel.txt available)", value=False) wheel_raw_text = load_wheel_raw_text(DATA_PATHS["wheel_template"]) if use_wheel else "" # Styling st.markdown( """ """, unsafe_allow_html=True, ) if st.button("🎰 Generate Prediction" if not run_backtest else "🧪 Run Backtest"): with st.spinner("Building ensemble models and generating results..."): # Run V3.0 predictor result = predict_for_game_v3( csv_path=Path(data_path), game_key=game_key, run_backtest=run_backtest ) if run_backtest: # Display backtest results if 'error' in result: st.error(f"❌ Backtest Error: {result['error']}") else: st.success("✅ Backtest Complete!") # Show summary metrics st.subheader("📊 Backtest Summary") col1, col2, col3 = st.columns(3) with col1: st.metric("Model 3+ Matches", f"{result.get('model_3plus_rate', 0)}%") with col2: st.metric("Random 3+ Matches", f"{result.get('random_3plus_rate', 0)}%") with col3: st.metric("Even Count Accuracy", f"{result.get('even_count_accuracy', 0)}%") # Detailed hit rates st.subheader("🎯 Hit Rate Comparison") hit_data = [] for i in range(6): model_rate = result.get(f'model_hit_{i}_rate', 0) random_rate = result.get(f'random_hit_{i}_rate', 0) hit_data.append({ 'Matches': i, 'Model Rate (%)': model_rate, 'Random Rate (%)': random_rate, 'Improvement': f"+{model_rate - random_rate:.1f}%" if model_rate > random_rate else f"{model_rate - random_rate:.1f}%" }) hit_df = pd.DataFrame(hit_data) st.table(hit_df) # Raw results with st.expander("📋 Full Backtest Results"): st.json(result) else: # Display prediction results st.success(f"🧠 Predicted Numbers: {result['numbers']}") if result.get("star") is not None: star_col_name = cfg.star_col or 'Star' st.success(f"🌟 Predicted {star_col_name}: {result['star']}") else: st.info("ℹ️ No bonus number for this game") # Show additional info st.info(f"🔢 Total Sum: {sum(result['numbers'])} (Expected Range: {cfg.sum_min}–{cfg.sum_max})") model_info = result.get('model_info', {}) st.info(f"🤖 Models built for {model_info.get('numbers_modeled', 0)}/{model_info.get('total_possible', 0)} numbers") # ------------------------- # GOD MODE + CONSENSUS + ANCHORS (UI only) # ------------------------- god_sets = result.get("god_sets") or result.get("god_mode_sets") or result.get("godmode_sets") or [] strike = result.get("strike_tickets") or result.get("strike") or {} if god_sets: st.info("🎲 Lotto Cash Predictions") for s in god_sets: name = s.get("name", "set") nums = s.get("numbers") or [] st.info(f"{name}: {fmt_ticket({'numbers': nums, 'star': s.get('star', None)})}") # APPEND PATCH: EXTRA_PREDICTIONS_UI_V1 (APPEND-ONLY / NO DELETIONS) # Shows the two extra engine predictions (if present) without changing layout/colors. try: _extra = result.get("extra_predictions") if isinstance(_extra, dict) and (_extra.get("ml_ensemble") or _extra.get("dl_sequence")): st.info("➕ Extra Predictions") mlp = _extra.get("ml_ensemble") if isinstance(_extra.get("ml_ensemble"), dict) else None dlp = _extra.get("dl_sequence") if isinstance(_extra.get("dl_sequence"), dict) else None if mlp: st.info(f"ML Ensemble (RF/XGB): {fmt_ticket(mlp, cfg=cfg)}") if dlp: st.info(f"DL Sequence (LSTM/Transformer): {fmt_ticket(dlp, cfg=cfg)}") except Exception: pass collapse = None neighbor = None # Convergence tickets (engine may provide these directly) convergence_core = result.get("convergence_core") convergence_cooccur = result.get("convergence_cooccur") if isinstance(strike, dict): collapse = strike.get("collapse") or strike.get("consensus_collapse") or strike.get("CONSENSUS_COLLAPSE") neighbor = strike.get("neighbor") or strike.get("consensus_neighbor") or strike.get("CONSENSUS_NEIGHBOR") # If engine didn't supply these, allow them to come from strike payloads too if not convergence_core: convergence_core = strike.get("convergence_core") or strike.get("CONVERGENCE_CORE") if not convergence_cooccur: convergence_cooccur = strike.get("convergence_cooccur") or strike.get("CONVERGENCE_COOCCUR") or strike.get("CONVERGENCE_CO-OCCUR") if collapse or neighbor or convergence_core or convergence_cooccur: st.info("🎯 Consensus Tickets") if collapse: st.info(f"Consensus (Collapse): {fmt_ticket(collapse, cfg=cfg)}") if neighbor: st.info(f"Neighbor Consensus: {fmt_ticket(neighbor, cfg=cfg)}") if convergence_core: st.info(f"Convergence Core: {fmt_ticket(convergence_core, cfg=cfg)}") if convergence_cooccur: st.info(f"Convergence Co-Occur: {fmt_ticket(convergence_cooccur, cfg=cfg)}") main_min = int(getattr(cfg, "main_min", 1)) main_max = int(getattr(cfg, "main_max", 99)) seed = int(result.get("seed", 0) or 0) low_anchor = build_low_anchor(god_sets, main_min=main_min, main_max=main_max, seed=seed) mid_anchor = build_mid_anchor(god_sets, main_min=main_min, main_max=main_max, seed=seed) # Per-ticket Star Ball display (prevents the same star being shown for every line) def _pick_star_for_display(offset:int=0): try: if not getattr(cfg, "star_col", None): return None smin = int(getattr(cfg, "star_min", 1)) smax = int(getattr(cfg, "star_max", 1)) if smax < smin: return None rnd = random.Random(int(seed) + int(offset)) # Frequency weights: all-time + last-80 (recency boosted) series_all = pd.to_numeric(df[cfg.star_col], errors="coerce").dropna().astype(int).tolist() if not series_all: return rnd.randint(smin, smax) series_80 = pd.to_numeric(df[cfg.star_col].tail(80), errors="coerce").dropna().astype(int).tolist() from collections import Counter fa = Counter(series_all) f8 = Counter(series_80) candidates = list(range(smin, smax + 1)) weights = [] for b in candidates: w = 1.0 + float(fa.get(b, 0)) + 2.0 * float(f8.get(b, 0)) # gentle low-zone preference for LA / MM (matches engine intent) if getattr(cfg, "name", "") in ("Lotto America", "Mega Millions") and b <= (smin + 4): w *= 1.10 weights.append(w) return int(rnd.choices(candidates, weights=weights, k=1)[0]) except Exception: return None # ------------------------- # Recommended Plays (per-game, UI-only) # ------------------------- recommended = None if game_key == "gimme5": # If G5 play5 was built above, reuse it; else fall back to default recipe try: recommended = play5 if 'play5' in locals() else None except Exception: recommended = None if not recommended: recommended = _build_recommended_plays( game_key=game_key, primary_nums=result.get("numbers") or [], primary_star=result.get("star"), god_sets_list=god_sets, collapse_t=collapse, neighbor_t=neighbor, ) if recommended: st.info("🏷️ Recommended Plays (5-ticket recipe)") for lbl, tk in recommended: st.info(f"{lbl}: {fmt_ticket(tk, cfg=cfg)}") # ------------------------- # Optional: Results entry + hit highlight + rolling 30-draw scorecard (silent) # ------------------------- with st.expander("✅ Results & Hit Tracking (optional)"): results_input = st.text_input( "Paste official results (ex: 12-17-25-34-42 (9))", key=f"results_input_{game_key}", ) if st.button("Apply Results", key=f"apply_results_{game_key}"): main_set, star_val = _parse_results_text(results_input) if not main_set: st.warning("Couldn’t parse results. Example: 12-17-25-34-42 (9)") else: # Store last results in session st.session_state[f"last_results_{game_key}"] = {"main": sorted(main_set), "star": star_val, "ts": datetime.now().isoformat()} # Update rolling scorecard (30) sk = _scorecard_key(game_key) sc = st.session_state.get(sk, []) # Score recommended plays scored = [] for lbl, tk in (recommended or []): mh, sh = _calc_hits(tk, main_set, star_val, cfg=cfg) scored.append({"label": lbl, "main_hits": mh, "star_hit": sh}) sc.append({ "time": datetime.now().strftime('%Y-%m-%d %H:%M'), "results": "-".join(str(x) for x in sorted(main_set)) + (f" (⭐ {star_val})" if star_val is not None else ""), "best_main_hits": max([x["main_hits"] for x in scored], default=0), }) st.session_state[sk] = sc[-30:] st.success("Saved. Scroll down for highlights and scorecard.") # Show highlight if we have stored results stored = st.session_state.get(f"last_results_{game_key}") if stored: rmain = set(stored.get("main") or []) rstar = stored.get("star") st.write(f"**Last results:** {'-'.join(str(x) for x in sorted(rmain))}" + (f" (⭐ {rstar})" if rstar is not None else "")) if recommended: st.write("**Auto-highlight hits (Recommended Plays):**") for lbl, tk in recommended: mh, sh = _calc_hits(tk, rmain, rstar, cfg=cfg) hit_note = f"{mh} hit" + ("s" if mh != 1 else "") if sh is True: hit_note += " + ⭐" st.write(f"- {lbl}: {fmt_ticket(tk, cfg=cfg)} → {hit_note}") # Silent rolling 30-draw scorecard sk = _scorecard_key(game_key) sc = st.session_state.get(sk, []) if sc: st.write("**Rolling scorecard (last 30 saves):**") st.dataframe(pd.DataFrame(sc), use_container_width=True) st.info("🎟️ Anchor Tickets") st.info(f"LOW Anchor (1–9): {fmt_ticket({'numbers': low_anchor, 'star': _pick_star_for_display(101)})}") st.info(f"MID Anchor (10–19): {fmt_ticket({'numbers': mid_anchor, 'star': _pick_star_for_display(202)})}") lines = [] lines.append(f"GAME: {game_key}") lines.append(f"GENERATED: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") lines.append("") lines.append(f"PRIMARY: {fmt_ticket(result.get('numbers') or [], result.get('star'))}") if god_sets: lines.append("") lines.append("LOTTO CASH SETS:") for s in god_sets: lines.append(f"- {s.get('name','set')}: {fmt_ticket(s, cfg=cfg)}") if collapse or neighbor or (convergence_core is not None) or (convergence_cooccur is not None): lines.append("") lines.append("CONSENSUS:") if collapse: lines.append(f"- collapse: {fmt_ticket(collapse, cfg=cfg)}") if neighbor: lines.append(f"- neighbor: {fmt_ticket(neighbor, cfg=cfg)}") if convergence_core is not None: lines.append(f"- convergence_core: {fmt_ticket(convergence_core, cfg=cfg)}") if convergence_cooccur is not None: lines.append(f"- convergence_cooccur: {fmt_ticket(convergence_cooccur, cfg=cfg)}") lines.append("") lines.append(f"LOW ANCHOR (1–9): {fmt_ticket({'numbers': low_anchor, 'star': _pick_star_for_display(101)})}") lines.append(f"MID ANCHOR (10–19): {fmt_ticket({'numbers': mid_anchor, 'star': _pick_star_for_display(202)})}") content = "\n".join(lines).strip() + "\n" fname = f"{game_key}_tickets_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" st.download_button("⬇️ Download tickets (timestamped)", data=content.encode("utf-8"), file_name=fname, mime="text/plain") # ------------------------------------------------------------ # NITRO PACK download add-on (append-only; does not change layout) # Creates an additional download option that includes Nitro Pack tickets # if the engine provided them in the result payload. # ------------------------------------------------------------ try: nitro_pack = ( result.get("nitro_pack") or result.get("nitro") or result.get("nitro_pack_tickets") or result.get("nitro_tickets") ) if isinstance(nitro_pack, dict): # allow {"tickets":[...]} or {"sets":[...]} nitro_pack = nitro_pack.get("tickets") or nitro_pack.get("sets") or nitro_pack.get("pack") or [] if nitro_pack and isinstance(nitro_pack, (list, tuple)): lines2 = list(lines) lines2.append("") lines2.append("NITRO PACK:") for i_np, tk_np in enumerate(nitro_pack, start=1): try: lines2.append(f"- nitro_{i_np}: {fmt_ticket(tk_np, cfg=cfg)}") except Exception: # fall back if tk_np is just a list of numbers try: lines2.append(f"- nitro_{i_np}: {fmt_ticket({'numbers': tk_np, 'star': None}, cfg=cfg)}") except Exception: pass content2 = "\n".join(lines2).strip() + "\n" fname2 = f"{game_key}_tickets_NITRO_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" st.download_button( "⬇️ Download tickets + Nitro Pack", data=content2.encode("utf-8"), file_name=fname2, mime="text/plain", ) except Exception: pass # Wheel (if requested) if use_wheel and wheel_raw_text: # Build and show wheel table (A..T mapping) wheel_map = display_wheel_table_from_hotcold(hot_df, cold_df) # Expand numeric wheel.txt to letter-template and display combos display_wheel_combinations_from_raw(wheel_map, wheel_raw_text) elif use_wheel: st.warning("⚠️ Wheel template file (wheel.txt) not found or empty") except FileNotFoundError: st.error(f"❌ File not found: `{data_path}`") except Exception as e: st.error(f"⚠️ Error: {str(e)}") import traceback st.error(f"Details: {traceback.format_exc()}") # ============================================================ # APPEND PATCH: NITRO_PACK_UI_V2 (APPEND-ONLY / NO DELETIONS) # - Patches lotto_predictor.predict_for_game_v3 inside sys.modules so that # future Streamlit reruns import the wrapped function automatically. # - Stores last engine result in st.session_state. # - Renders 4 additional prediction lines below existing UI. # ============================================================ def _NITRO_UI__patch_engine_predictor(): try: import sys as _sys _mod = _sys.modules.get("lotto_predictor") if _mod is None: return _orig = getattr(_mod, "predict_for_game_v3", None) if _orig is None or getattr(_orig, "_nitro_wrapped", False): return def _wrapped_predict_for_game_v3(*args, **kwargs): res = _orig(*args, **kwargs) try: st.session_state["_nitro_last_result"] = res gk = None if "game_key" in kwargs: gk = kwargs.get("game_key") elif len(args) >= 2: gk = args[1] st.session_state["_nitro_last_game_key"] = gk except Exception: pass return res _wrapped_predict_for_game_v3._nitro_wrapped = True # type: ignore setattr(_mod, "predict_for_game_v3", _wrapped_predict_for_game_v3) except Exception: return def _NITRO_UI__get_ticket(res: dict, key: str): if not isinstance(res, dict): return None tk = res.get(key) if tk: return tk npk = res.get("nitro_pack") if isinstance(npk, dict): return npk.get(key) return None def _NITRO_UI__render(): res = st.session_state.get("_nitro_last_result") if not isinstance(res, dict) or ("error" in res): return gk = st.session_state.get("_nitro_last_game_key") try: if "game_key" in globals(): gk = globals().get("game_key") except Exception: pass cfg_local = None try: if gk and "GAME_CONFIGS" in globals(): cfg_local = GAME_CONFIGS.get(gk) except Exception: cfg_local = None mk = _NITRO_UI__get_ticket(res, "markov_addon") rf = _NITRO_UI__get_ticket(res, "rf_line") jc = _NITRO_UI__get_ticket(res, "jackpot_chase") st.info("⚡ Nitro Pack (4 extra lines)") st.info(f"🔁 Markov Add-on: {fmt_ticket(mk, cfg=cfg_local) if mk else '(not available for this run)'}") st.info(f"🌲 RF-only ML Line: {fmt_ticket(rf, cfg=cfg_local) if rf else '(not available for this run)'}") if jc: label = None try: label = jc.get("mode") except Exception: label = None prefix = "🚀 Jackpot Chase Mode (optional)" if not label else f"🚀 {label}" st.info(f"{prefix}: {fmt_ticket(jc, cfg=cfg_local)}") else: st.info("🚀 Jackpot Chase Mode (optional): (not available for this run)") try: pasted = "" if gk: pasted = st.session_state.get(f"results_input_{gk}", "") main_set, _star = _parse_results_text(pasted) if main_set: engine_primary = {"numbers": res.get("numbers") or [], "star": res.get("star")} eng_hits = len(_ticket_main_set(engine_primary) & main_set) rf_hits = len(_ticket_main_set(rf) & main_set) if rf else 0 mk_hits = len(_ticket_main_set(mk) & main_set) if mk else 0 jc_hits = len(_ticket_main_set(jc) & main_set) if jc else 0 st.info(f"⚖️ Live compare vs pasted results: Engine={eng_hits}/5 • RF={rf_hits}/5 • Markov={mk_hits}/5 • Chase={jc_hits}/5") else: st.info("⚖️ Live compare (paste official results to score Engine vs RF vs Markov vs Chase)") except Exception: st.info("⚖️ Live compare (paste official results to score Engine vs RF vs Markov vs Chase)") # Patch engine module for future reruns, then render any stored output try: _NITRO_UI__patch_engine_predictor() except Exception: pass try: _NITRO_UI__render() except Exception: pass # ========================= # NITRO PATCH v3 (append-only) # Adds an explicit checkbox/tickbox for Live Compare display. # Does NOT change any layout above; this renders at the very bottom. # ========================= def _NITRO_UI__live_compare_checkbox_panel(): try: import streamlit as st except Exception: return try: # Show a small optional panel at the bottom so user has a clear tickbox. with st.expander("⚖️ Live Compare (optional)", expanded=False): enabled = st.checkbox( "Enable Live compare (paste official results to score Engine vs RF vs Markov vs Chase)", value=False, key="nitro_live_compare_enabled", help="If enabled, this panel will show the live hit counts based on the last saved results.", ) st.caption("Tip: Use the existing '✅ Results & Hit Tracking (optional)' expander above, paste official results, then click 'Apply Results'.") if not enabled: return # Try to infer the active game_key from session by looking for a last_results_* entry. # This is safe and does not require changing any code above. last_keys = [k for k in st.session_state.keys() if isinstance(k, str) and k.startswith("last_results_")] if not last_keys: st.info("No saved results yet. Paste official results above and click 'Apply Results' first.") return # Prefer the most recently updated key if multiple exist. def _ts(k): try: v = st.session_state.get(k) or {} return v.get("ts") or "" except Exception: return "" last_keys_sorted = sorted(last_keys, key=_ts, reverse=True) game_key = last_keys_sorted[0].replace("last_results_", "", 1) stored = st.session_state.get(f"last_results_{game_key}") or {} main = set(stored.get("main") or []) if not main: st.info("Saved results found but could not parse main numbers.") return # Try to fetch the last predictions we rendered for this game from session (if present), # else do a best-effort pull from commonly used session keys. # We will display 'N/A' if we can't find them; still satisfies checkbox presence. pred = st.session_state.get(f"nitro_last_predictions_{game_key}") or st.session_state.get("nitro_last_predictions") or {} eng = pred.get("engine") or pred.get("primary") or pred.get("balanced") rf = pred.get("rf") mk = pred.get("markov") jc = pred.get("chase") or pred.get("jackpot_chase") def _main_set(tk): try: if tk is None: return set() if isinstance(tk, dict): nums = tk.get("numbers") or tk.get("nums") or tk.get("main") or [] elif isinstance(tk, (list, tuple)): nums = list(tk) else: nums = [int(x) for x in re.findall(r"\d+", str(tk))][:5] return set(int(x) for x in nums if str(x).strip() != "") except Exception: return set() eng_hits = len(_main_set(eng) & main) if eng else None rf_hits = len(_main_set(rf) & main) if rf else None mk_hits = len(_main_set(mk) & main) if mk else None jc_hits = len(_main_set(jc) & main) if jc else None # Display in a clear card-like info line parts = [] parts.append(f"Engine={'N/A' if eng_hits is None else str(eng_hits) + '/5'}") parts.append(f"RF={'N/A' if rf_hits is None else str(rf_hits) + '/5'}") parts.append(f"Markov={'N/A' if mk_hits is None else str(mk_hits) + '/5'}") parts.append(f"Chase={'N/A' if jc_hits is None else str(jc_hits) + '/5'}") st.success(" • ".join(parts)) st.caption("If any are N/A, it means that line wasn't cached in session for this run (the main UI line still works).") except Exception: # Never break the app return # Render the checkbox panel last try: _NITRO_UI__live_compare_checkbox_panel() except Exception: pass # ====================================================================================== # NITRO PACK V4 ADD-ON (append-only) # - Performance memory: keep last N scored results for Engine/RF/Markov/Chase # - Auto Chase helper: optional checkbox + additional download that includes # a "recommended" bundle when the engine has been cold # - RF confidence display (if provided by engine) # NOTE: This block is strictly append-only and does not modify any code above. # ====================================================================================== def _nitro__safe_fmt_nums(nums): try: if nums is None: return "" if isinstance(nums, dict): nums = nums.get("numbers") or nums.get("nums") or nums.get("main") or [] if isinstance(nums, (tuple, list)): return "-".join(str(int(x)) for x in nums) return "-".join(re.findall(r"\d+", str(nums))[:5]) except Exception: return "" def _nitro__hits(pred_ticket, result_main_set): try: if pred_ticket is None: return None if isinstance(pred_ticket, dict): nums = pred_ticket.get("numbers") or pred_ticket.get("nums") or pred_ticket.get("main") or [] elif isinstance(pred_ticket, (list, tuple)): nums = list(pred_ticket) else: nums = [int(x) for x in re.findall(r"\d+", str(pred_ticket))][:5] s = set(int(x) for x in nums if str(x).strip() != "") return int(len(s & set(result_main_set))) except Exception: return None def _NITRO_UI__cache_last_predictions(): """Cache the last rendered predictions so Live Compare + performance scoring can work.""" try: res = st.session_state.get("_nitro_last_result") gk = st.session_state.get("_nitro_last_game_key") if not isinstance(res, dict) or not gk: return # Extract best-known tickets engine_ticket = ( res.get("primary") or (res.get("god_mode", {}) or {}).get("balanced") or (res.get("god_mode_sets", {}) or {}).get("balanced") ) nitro_pack = res.get("nitro_pack") if isinstance(res.get("nitro_pack"), dict) else {} mk = res.get("markov_addon") or (nitro_pack.get("markov_addon") if isinstance(nitro_pack, dict) else None) rf = res.get("rf_line") or (nitro_pack.get("rf_line") if isinstance(nitro_pack, dict) else None) jc = res.get("jackpot_chase") or (nitro_pack.get("jackpot_chase") if isinstance(nitro_pack, dict) else None) st.session_state[f"nitro_last_predictions_{gk}"] = { "engine": engine_ticket, "rf": rf, "markov": mk, "chase": jc, "rf_confidence": res.get("rf_confidence") or (rf.get("confidence") if isinstance(rf, dict) else None), "ts": datetime.now().isoformat(), } # Also keep a generic last snapshot for safety st.session_state["nitro_last_predictions"] = st.session_state[f"nitro_last_predictions_{gk}"] except Exception: return def _NITRO_UI__score_if_new_results(): """If the user saved official results, score the cached lines and append to history.""" try: last_keys = [k for k in st.session_state.keys() if isinstance(k, str) and k.startswith("last_results_")] if not last_keys: return def _ts(k): try: v = st.session_state.get(k) or {} return v.get("ts") or "" except Exception: return "" last_key = sorted(last_keys, key=_ts, reverse=True)[0] gk = last_key.replace("last_results_", "", 1) stored = st.session_state.get(last_key) or {} main = stored.get("main") or [] rts = stored.get("ts") or "" if not main or not rts: return pred = st.session_state.get(f"nitro_last_predictions_{gk}") or {} scored_key = f"nitro_last_scored_ts_{gk}" if st.session_state.get(scored_key) == rts: return entry = { "ts": rts, "engine": _nitro__hits(pred.get("engine"), main), "rf": _nitro__hits(pred.get("rf"), main), "markov": _nitro__hits(pred.get("markov"), main), "chase": _nitro__hits(pred.get("chase"), main), } hist_key = f"nitro_score_history_{gk}" hist = st.session_state.get(hist_key) or [] if not isinstance(hist, list): hist = [] hist.append(entry) st.session_state[hist_key] = hist[-12:] st.session_state[scored_key] = rts except Exception: return def _NITRO_UI__performance_panel(): try: _NITRO_UI__cache_last_predictions() _NITRO_UI__score_if_new_results() gk = st.session_state.get("_nitro_last_game_key") if not gk: hist_keys = [k for k in st.session_state.keys() if isinstance(k, str) and k.startswith("nitro_score_history_")] if not hist_keys: return gk = hist_keys[-1].replace("nitro_score_history_", "", 1) hist = st.session_state.get(f"nitro_score_history_{gk}") or [] if not isinstance(hist, list): hist = [] with st.expander("📈 Nitro Pack Performance (optional)", expanded=False): st.caption("Scores update automatically after you paste official results and click 'Apply Results'.") if not hist: st.info("No scored results yet. Paste official results above and click 'Apply Results' to start tracking.") return def _avg(field): vals = [h.get(field) for h in hist if isinstance(h.get(field), int)] return (sum(vals) / len(vals)) if vals else None a_eng = _avg("engine") a_rf = _avg("rf") a_mk = _avg("markov") a_jc = _avg("chase") st.success( "Averages (last {}): Engine={} RF={} Markov={} Chase={}".format( len(hist), "N/A" if a_eng is None else f"{a_eng:.2f}/5", "N/A" if a_rf is None else f"{a_rf:.2f}/5", "N/A" if a_mk is None else f"{a_mk:.2f}/5", "N/A" if a_jc is None else f"{a_jc:.2f}/5", ) ) pred = st.session_state.get(f"nitro_last_predictions_{gk}") or {} rf_conf = pred.get("rf_confidence") if rf_conf is not None: try: st.info(f"RF Confidence: {float(rf_conf):.1f}% (heuristic from RF probabilities)") except Exception: st.info(f"RF Confidence: {rf_conf}") for h in reversed(hist[-10:]): st.write( f"{h.get('ts','')[:19]} • Engine={h.get('engine','N/A')}/5 • RF={h.get('rf','N/A')}/5 • Markov={h.get('markov','N/A')}/5 • Chase={h.get('chase','N/A')}/5" ) cold = (a_eng is not None and a_eng < 1.0) auto = st.checkbox( "🚀 Auto-suggest Jackpot Chase when Engine is cold (avg < 1.0 hit)", value=bool(st.session_state.get("nitro_auto_chase", False)), key="nitro_auto_chase", help="Does not change your main output; it only adds a helper suggestion + a separate download option below.", ) if auto and cold: st.warning("Engine has been cold recently. Jackpot Chase is suggested for upside (optional).") try: pred2 = st.session_state.get(f"nitro_last_predictions_{gk}") or {} lines = [] lines.append(f"GAME: {gk}") lines.append(f"GENERATED: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") lines.append("") lines.append(f"ENGINE: {_nitro__safe_fmt_nums(pred2.get('engine'))}") lines.append(f"MARKOV: {_nitro__safe_fmt_nums(pred2.get('markov'))}") lines.append(f"RF: {_nitro__safe_fmt_nums(pred2.get('rf'))}") lines.append(f"CHASE: {_nitro__safe_fmt_nums(pred2.get('chase'))}") lines.append("") if auto and cold: lines.append("RECOMMENDED: ENGINE + CHASE (Engine cold; Chase optional)") content = "\n".join(lines).strip() + "\n" st.download_button( "⬇️ Download Nitro Lines (Engine/RF/Markov/Chase)", data=content.encode("utf-8"), file_name=f"{gk}_nitro_lines_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt", mime="text/plain", ) except Exception: pass except Exception: return try: _NITRO_UI__performance_panel() except Exception: pass