#!/usr/bin/env python3 """ predictor.py — Universal RandomForest (CSV-only) predictor for lottery-style games. What it does - Reads a game's CSV draw history directly (no dependence on your engine pools). - Trains RandomForestClassifier models to estimate the probability each number will appear next draw. - Produces: * RF-1: Top-N numbers by probability (most likely) * RF-2: Diversified RF (sampled from top pool; differs from RF-1 by >=2 numbers by default) Designed to be robust across many CSV formats: - Columns like n1..n5 / num1..num5 / ball1..ball5 - Any 5 numeric columns (fallback) - A single column containing a hyphen/space/comma separated list of numbers (fallback heuristic) Optional bonus support - If a bonus column exists (e.g., megaball/powerball/starball), you can pass bonus_max and it will also train a bonus RF and return bonus prediction. Requirements - pandas - scikit-learn - numpy If scikit-learn isn't installed, this module returns None predictions (safe failure). Usage (import) from predictor import UniversalRFPredictor rf = UniversalRFPredictor() out = rf.predict(csv_path, main_max=52, main_n=5, bonus_max=10, bonus_n=1) print(out["rf1_numbers"], out.get("rf1_bonus")) print(out["rf2_numbers"], out.get("rf2_bonus")) Usage (CLI) python predictor.py --csv "E:\data\la_results.csv" --main-max 52 --main-n 5 --bonus-max 10 --bonus-n 1 """ from __future__ import annotations import argparse import hashlib import os import re import random from dataclasses import dataclass from typing import Any, Dict, List, Optional, Sequence, Tuple # ----------------------------- helpers ----------------------------- _SPLIT_RE = re.compile(r"[^0-9]+") def _safe_int(x: Any) -> Optional[int]: try: if x is None: return None if isinstance(x, bool): return None return int(x) except Exception: try: s = str(x).strip() if not s: return None return int(float(s)) except Exception: return None def _dedupe(seq: Sequence[Any]) -> List[int]: out: List[int] = [] seen = set() for v in (seq or []): iv = _safe_int(v) if iv is None: continue if iv in seen: continue seen.add(iv) out.append(iv) return out def _md5_seed(s: str, fallback: int = 1337) -> int: try: return int(hashlib.md5(s.encode("utf-8")).hexdigest()[:8], 16) except Exception: return fallback def _clip(nums: Sequence[int], lo: int, hi: int) -> List[int]: out = [] for n in nums: if lo <= n <= hi: out.append(n) return out # ----------------------------- CSV parsing ----------------------------- @dataclass class ParsedHistory: draws: List[List[int]] # list of main number lists bonus: List[Optional[int]] # list of bonus values aligned with draws class CSVHistoryReader: """Best-effort reader for many draw-history CSV layouts.""" DEFAULT_MAIN_PATTERNS = [ ["n1", "n2", "n3", "n4", "n5"], ["num1", "num2", "num3", "num4", "num5"], ["ball1", "ball2", "ball3", "ball4", "ball5"], ["w1", "w2", "w3", "w4", "w5"], ] DEFAULT_BONUS_CANDIDATES = [ "mb", "megaball", "mega_ball", "pb", "powerball", "power_ball", "sb", "star", "starball", "star_ball", "bonus", "bonusball", "bonus_ball", "luckyball", "lucky_ball", ] def __init__(self, verbose: bool = False): self.verbose = verbose def read(self, csv_path: str, main_n: int = 5) -> ParsedHistory: try: import pandas as pd # type: ignore except Exception: return ParsedHistory(draws=[], bonus=[]) try: df = pd.read_csv(csv_path) except Exception: return ParsedHistory(draws=[], bonus=[]) # 1) Find main columns using common patterns main_cols: Optional[List[str]] = None for pat in self.DEFAULT_MAIN_PATTERNS: if all(c in df.columns for c in pat[:main_n]): main_cols = pat[:main_n] break # 2) If not found, choose first `main_n` numeric columns if main_cols is None: numeric_cols = [] for c in df.columns: try: kind = getattr(df[c].dtype, "kind", "") except Exception: kind = "" if kind in ("i", "u", "f"): numeric_cols.append(c) if len(numeric_cols) >= main_n: main_cols = numeric_cols[:main_n] # 3) If still not found, look for a single column that contains a list of numbers list_col: Optional[str] = None if main_cols is None: for c in df.columns: if df[c].dtype == object: # check if it looks like "1-2-3-4-5" or "1 2 3 4 5" sample = df[c].dropna().astype(str).head(10).tolist() hits = 0 for s in sample: parts = [p for p in _SPLIT_RE.split(s) if p] if len(parts) >= main_n and all(p.isdigit() for p in parts[:main_n]): hits += 1 if hits >= max(1, len(sample) // 2): list_col = c break # Bonus column detection bonus_col: Optional[str] = None lower_cols = {str(c).lower(): c for c in df.columns} for cand in self.DEFAULT_BONUS_CANDIDATES: if cand in lower_cols: bonus_col = lower_cols[cand] break draws: List[List[int]] = [] bonus: List[Optional[int]] = [] if main_cols is not None: for _, row in df[main_cols].iterrows(): nums = [] ok = True for c in main_cols: iv = _safe_int(row[c]) if iv is None: ok = False break nums.append(iv) if ok and len(nums) == main_n: draws.append(nums) bonus.append(_safe_int(row[bonus_col]) if bonus_col else None) elif list_col is not None: for _, row in df[[list_col]].iterrows(): s = str(row[list_col]) parts = [p for p in _SPLIT_RE.split(s) if p] if len(parts) < main_n: continue nums = [] ok = True for p in parts[:main_n]: iv = _safe_int(p) if iv is None: ok = False break nums.append(iv) if ok and len(nums) == main_n: draws.append(nums) # bonus may also be embedded later; ignore here (None) bonus.append(None) else: # Could not parse return ParsedHistory(draws=[], bonus=[]) return ParsedHistory(draws=draws, bonus=bonus) # ----------------------------- RF core ----------------------------- class UniversalRFPredictor: def __init__(self, verbose: bool = False): self.verbose = verbose self.reader = CSVHistoryReader(verbose=verbose) def _build_features(self, draws: List[List[int]], universe_max: int, lookback: int = 12): """Build per-number time-series features and next-step feature vector.""" import numpy as np # type: ignore if len(draws) < (lookback + 5): return {} appears = {n: [0] * len(draws) for n in range(1, universe_max + 1)} for t, d in enumerate(draws): s = set(d) for n in s: if 1 <= n <= universe_max: appears[n][t] = 1 def recent_count(arr, t, w): return int(sum(arr[max(0, t - w):t])) def gap_since(arr, t): for k in range(1, t + 1): if arr[t - k] == 1: return k return t feats = {} for n in range(1, universe_max + 1): arr = appears[n] X, y = [], [] for t in range(lookback, len(draws)): f = [ recent_count(arr, t, 5), recent_count(arr, t, 10), gap_since(arr, t), arr[t - 1], arr[t - 2] if t - 2 >= 0 else 0, arr[t - 3] if t - 3 >= 0 else 0, ] X.append(f) y.append(arr[t]) t = len(draws) last_f = [ recent_count(arr, t, 5), recent_count(arr, t, 10), gap_since(arr, t), arr[t - 1], arr[t - 2] if t - 2 >= 0 else 0, arr[t - 3] if t - 3 >= 0 else 0, ] feats[n] = (np.asarray(X, float), np.asarray(y, int), np.asarray(last_f, float)) return feats def _rank_numbers(self, draws: List[List[int]], universe_max: int, seed: int) -> List[Tuple[int, float]]: try: from sklearn.ensemble import RandomForestClassifier # type: ignore import numpy as np # type: ignore except Exception: return [] feats = self._build_features(draws, universe_max, lookback=12) if not feats: return [] probs: List[Tuple[int, float]] = [] for n, (X, y, last_f) in feats.items(): if int(y.sum()) < 5 or int((1 - y).sum()) < 5: continue try: clf = RandomForestClassifier( n_estimators=240, max_depth=9, random_state=seed, class_weight="balanced", n_jobs=-1 ) clf.fit(X, y) p = float(clf.predict_proba(last_f.reshape(1, -1))[0][1]) probs.append((n, p)) except Exception: continue probs.sort(key=lambda t: t[1], reverse=True) return probs def _rank_bonus(self, bonus_series: List[Optional[int]], bonus_max: int, seed: int) -> List[Tuple[int, float]]: """Simple RF for bonus (single categorical per draw).""" try: from sklearn.ensemble import RandomForestClassifier # type: ignore import numpy as np # type: ignore except Exception: return [] # Need enough bonus observations b = [_safe_int(x) for x in bonus_series] if sum(1 for x in b if x is not None) < 40: return [] # Build appearance series for each bonus value 1..bonus_max T = len(b) def recent_count(arr, t, w): return int(sum(arr[max(0, t - w):t])) def gap_since(arr, t): for k in range(1, t + 1): if arr[t - k] == 1: return k return t probs: List[Tuple[int, float]] = [] for val in range(1, bonus_max + 1): arr = [1 if _safe_int(b[t]) == val else 0 for t in range(T)] X, y = [], [] lookback = 10 for t in range(lookback, T): f = [ recent_count(arr, t, 5), recent_count(arr, t, 10), gap_since(arr, t), arr[t - 1], arr[t - 2] if t - 2 >= 0 else 0, arr[t - 3] if t - 3 >= 0 else 0, ] X.append(f) y.append(arr[t]) if len(X) < 30: continue X = np.asarray(X, float) y = np.asarray(y, int) if int(y.sum()) < 3 or int((1 - y).sum()) < 3: continue last_f = np.asarray([ recent_count(arr, T, 5), recent_count(arr, T, 10), gap_since(arr, T), arr[T - 1], arr[T - 2] if T - 2 >= 0 else 0, arr[T - 3] if T - 3 >= 0 else 0, ], float).reshape(1, -1) try: clf = RandomForestClassifier( n_estimators=200, max_depth=8, random_state=seed, class_weight="balanced", n_jobs=-1 ) clf.fit(X, y) p = float(clf.predict_proba(last_f)[0][1]) probs.append((val, p)) except Exception: continue probs.sort(key=lambda t: t[1], reverse=True) return probs def _pick_rf1_rf2(self, probs: List[Tuple[int, float]], main_n: int, min_diff: int = 2) -> Tuple[List[int], Optional[List[int]]]: if not probs or len(probs) < main_n: return [], None rf1 = [n for n, _ in probs[:main_n]] # diversified sampling from top pool pool = probs[:max(12, main_n * 3)] nums = [n for n, _ in pool] weights = [max(1e-9, p) for _, p in pool] seed = _md5_seed(",".join(map(str, rf1)) + "|" + ",".join(map(str, nums[:10]))) rng = random.Random(seed) def sample_ticket() -> List[int]: chosen = [] remaining = list(zip(nums, weights)) for _ in range(main_n): total = sum(w for _, w in remaining) if total <= 0: break r = rng.random() * total cum = 0.0 pick_idx = 0 for i, (n, w) in enumerate(remaining): cum += w if cum >= r: pick_idx = i break n_pick, _ = remaining.pop(pick_idx) chosen.append(n_pick) return sorted(_dedupe(chosen)) rf2: Optional[List[int]] = None for _ in range(60): cand = sample_ticket() if len(cand) != main_n: continue overlap = len(set(cand) & set(rf1)) # require at least `min_diff` numbers different if overlap <= (main_n - min_diff): rf2 = cand break # fallback: if diversification fails, return None (honest) return sorted(rf1), sorted(rf2) if rf2 else None def predict( self, csv_path: str, main_max: int, main_n: int = 5, bonus_max: Optional[int] = None, bonus_n: int = 0, seed_key: str = "", min_diff: int = 2, min_draws: int = 60, ) -> Dict[str, Any]: """Return RF predictions from CSV history.""" out: Dict[str, Any] = { "ok": False, "rf1_numbers": [], "rf2_numbers": None, "rf1_bonus": None, "rf2_bonus": None, "reason": "", } if not csv_path or not os.path.exists(csv_path): out["reason"] = "csv_missing" return out hist = self.reader.read(csv_path, main_n=main_n) draws = hist.draws if len(draws) < min_draws: out["reason"] = f"too_few_draws:{len(draws)}" return out # clip to valid range defensively draws = [_clip(_dedupe(d), 1, int(main_max))[:main_n] for d in draws if d] draws = [d for d in draws if len(d) == main_n] if len(draws) < min_draws: out["reason"] = f"too_few_valid_draws:{len(draws)}" return out seed = _md5_seed(seed_key or (csv_path + "|" + str(main_max) + "|" + str(main_n))) probs = self._rank_numbers(draws, int(main_max), seed=seed) if not probs: out["reason"] = "rf_rank_empty" return out rf1, rf2 = self._pick_rf1_rf2(probs, main_n=int(main_n), min_diff=int(min_diff)) if not rf1: out["reason"] = "rf1_empty" return out out["rf1_numbers"] = rf1 out["rf2_numbers"] = rf2 out["ok"] = True # Bonus prediction (optional) if bonus_max and bonus_n and bonus_n > 0: bprobs = self._rank_bonus(hist.bonus, int(bonus_max), seed=seed) if bprobs: out["rf1_bonus"] = bprobs[0][0] # For bonus, "diversification" isn't meaningful; if rf2 exists, reuse rf1_bonus out["rf2_bonus"] = out["rf1_bonus"] return out # ----------------------------- CLI ----------------------------- def main(): p = argparse.ArgumentParser(description="Universal RF predictor (CSV-only).") p.add_argument("--csv", required=True, help="Path to draw-history CSV") p.add_argument("--main-max", required=True, type=int, help="Max main number (e.g., 52)") p.add_argument("--main-n", default=5, type=int, help="Count of main numbers per draw") p.add_argument("--bonus-max", default=None, type=int, help="Max bonus number (optional)") p.add_argument("--bonus-n", default=0, type=int, help="Bonus count (0 or 1)") p.add_argument("--min-draws", default=60, type=int, help="Minimum draws required") p.add_argument("--min-diff", default=2, type=int, help="RF-2 min number-diff vs RF-1") p.add_argument("--seed-key", default="", help="Seed key for reproducibility") p.add_argument("--verbose", action="store_true") args = p.parse_args() rf = UniversalRFPredictor(verbose=args.verbose) out = rf.predict( csv_path=args.csv, main_max=args.main_max, main_n=args.main_n, bonus_max=args.bonus_max, bonus_n=args.bonus_n, seed_key=args.seed_key, min_diff=args.min_diff, min_draws=args.min_draws, ) print(out) if out.get("ok"): print("RF-1:", out["rf1_numbers"], "BONUS:", out.get("rf1_bonus")) print("RF-2:", out.get("rf2_numbers"), "BONUS:", out.get("rf2_bonus")) else: print("Not OK:", out.get("reason")) if __name__ == "__main__": main()