Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import random | |
| from collections import Counter | |
| from datetime import timedelta | |
| def clean_powerball_df(raw_df): | |
| #DELETE THE ROWS WHICH CONTAINS "DOUBLE PLAY" | |
| df = raw_df[~raw_df["DrawDate"].str.contains("Double Play", na=False)].copy() | |
| df["Date"] = pd.to_datetime(df["DrawDate"]) | |
| return df | |
| def is_sum_in_range(numbers, min_sum, max_sum): | |
| total = sum(numbers) | |
| return min_sum <= total <= max_sum | |
| def pb_predict_star_ball(df, star_probs=None): | |
| from collections import Counter | |
| from datetime import timedelta | |
| import random | |
| cutoff_sb = df["Date"].max() - timedelta(days=30) | |
| recent_starballs = df[df["Date"] >= cutoff_sb]["PB"].astype(int).tolist() | |
| star_freq_all = Counter(df["PB"].astype(int)) | |
| star_freq_recent = Counter(recent_starballs) | |
| star_probs = star_probs or {} | |
| star_ball_weights = {} | |
| for sb in range(1, 27): #Powerball's Starball is 1-26 | |
| w = ( | |
| star_freq_all.get(sb, 0) * 0.6 + | |
| star_freq_recent.get(sb, 0) * 0.2 + | |
| star_probs.get(sb, 0) * 0.2 | |
| ) | |
| star_ball_weights[sb] = w | |
| elements, weight_vals = zip(*star_ball_weights.items()) | |
| return random.choices(elements, weights=weight_vals, k=1)[0] | |
| def weighted_choice(counter_dict, k=1): | |
| elements, weights = zip(*counter_dict.items()) | |
| return random.choices(elements, weights=weights, k=k) | |
| def get_ml_number_probs(df): | |
| from sklearn.ensemble import RandomForestClassifier | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.preprocessing import OneHotEncoder | |
| from sklearn.pipeline import make_pipeline | |
| import numpy as np | |
| df["Date"] = pd.to_datetime(df["DrawDate"]) | |
| df["DayOfWeek"] = df["Date"].dt.dayofweek | |
| # PRE-PROCESS THE DATA | |
| records = [] | |
| for _, row in df.iterrows(): | |
| for col in ['1', '2', '3', '4', '5']: | |
| records.append({ | |
| "DayOfWeek": row["Date"].dayofweek, | |
| "DrawNumber": int(row[col]) | |
| }) | |
| data = pd.DataFrame(records) | |
| X = data[["DayOfWeek"]] | |
| y = data["DrawNumber"] | |
| model = make_pipeline( | |
| OneHotEncoder(), | |
| RandomForestClassifier(n_estimators=100, random_state=42) | |
| ) | |
| model.fit(X, y) | |
| latest_day = df["Date"].max().dayofweek | |
| X_predict = pd.DataFrame({"DayOfWeek": [latest_day] * 69}) #PowerBall 1-69 | |
| X_predict["Num"] = list(range(1, 70)) #PowerBall 1-69 | |
| probs = model.predict_proba(X_predict[["DayOfWeek"]]) | |
| number_probs = dict(zip(model.classes_, probs[0])) | |
| return number_probs | |
| def generate_pb_prediction(df, allow_sequences=True): | |
| NUMBER_RANGE = range(1, 70) # Powerball RANGE | |
| df = df.sort_values("Date", ascending=False) | |
| last_draw = df.iloc[0][['1', '2', '3', '4', '5']].astype(int).tolist() | |
| flat_all = df[['1', '2', '3', '4', '5']].values.flatten() | |
| freq_all = Counter(flat_all) | |
| cutoff = df["Date"].max() - timedelta(days=30) | |
| recent_df = df[df["Date"] >= cutoff] | |
| flat_recent = recent_df[['1', '2', '3', '4', '5']].values.flatten() | |
| freq_recent = Counter(flat_recent) | |
| # PICK A NUMBER FROM THE LAST DRAW (FREQUENCY BASED) | |
| intersection = set(last_draw) & set(freq_all.keys()) | |
| if intersection: | |
| weights = {n: freq_all[n] for n in intersection} | |
| selected = [weighted_choice(weights)[0]] | |
| #print(f"🔹 First Number (From previous Draw): {selected[0]}") | |
| else: | |
| selected = [weighted_choice(freq_all)[0]] | |
| number_probs = get_ml_number_probs(df) | |
| # WEIGHTED NUMBER POOL (ALL FREQUENCY %60 - LAST 30 DAYS %20 - MACHINE LEARNING %20) | |
| combined_weights = {} | |
| for num in NUMBER_RANGE: | |
| if num not in selected: | |
| w = ( | |
| freq_all.get(num, 0) * 0.6 + #* 0.6 + | |
| freq_recent.get(num, 0) *0.2 + #* 0.2 + | |
| number_probs.get(num, 0) *0.2 #* 0.2 | |
| ) | |
| #print(f"Number {num}: w = {w:.4f} (freq_all={freq_all.get(num, 0)}, freq_recent={freq_recent.get(num, 0)}, ml={number_probs.get(num, 0):.4f})") | |
| combined_weights[num] = w | |
| # SEQUENCE NUMBERS PART | |
| seq_pair = [] | |
| if allow_sequences: | |
| for _ in range(5): | |
| pool = sorted(set(weighted_choice(combined_weights, 20))) | |
| adjacent_pairs = [] | |
| #print(f"there is pool variable: {pool}") | |
| for i in range(len(pool) - 1): | |
| if pool[i] + 1 == pool[i + 1]: | |
| adjacent_pairs.append([pool[i], pool[i + 1]]) | |
| if adjacent_pairs: | |
| seq_pair = random.choice(adjacent_pairs) # 🔄 Rastgele ardışık çift seç | |
| break | |
| if seq_pair: | |
| selected += seq_pair | |
| #print(f"🔗 Sequencial Numbers are selected: {seq_pair}") | |
| for n in seq_pair: | |
| combined_weights.pop(n, None) | |
| # MAKE IT 5 AGAIN | |
| while len(selected) < 5: | |
| pick = weighted_choice(combined_weights)[0] | |
| if pick not in selected: | |
| #print(f"➕ Weighted Number is added (combined_weights): {pick}") | |
| selected.append(pick) | |
| combined_weights.pop(pick, None) | |
| # Parity FIXING (2-3 / 3-2) | |
| while True: | |
| even = [n for n in selected if n % 2 == 0] | |
| odd = [n for n in selected if n % 2 == 1] | |
| #print(f"Current parity: {len(even)} even, {len(odd)} odd -> {selected}") | |
| if len(even) in [2, 3] and len(odd) in [2, 3]: | |
| #print("✅ Parity OK. Breaking loop.") | |
| break | |
| for i, num in enumerate(selected): | |
| if len(even) in [2, 3] and len(odd) in [2, 3]: | |
| break | |
| elif len(even) > 3 or len(odd) < 2: | |
| target_parity = 1 | |
| else: | |
| target_parity = 0 | |
| parity_pool = { | |
| n: w for n, w in combined_weights.items() | |
| if n % 2 == target_parity and n not in selected | |
| } | |
| if parity_pool: | |
| r = weighted_choice(parity_pool)[0] | |
| #print(f"♻️ Parity Fixing → {selected[i]} instead of {r}") | |
| selected[i] = r | |
| even = [n for n in selected if n % 2 == 0] | |
| odd = [n for n in selected if n % 2 == 1] | |
| #break | |
| print("✅ Final selected:", sorted(selected)) | |
| while not is_sum_in_range(selected, 65, 265): #Powerball Sum Range | |
| return generate_pb_prediction(df, allow_sequences) | |
| return sorted(selected) | |
| def get_hot_and_cold_numbers(df, top_n=10): | |
| from collections import Counter | |
| NUMBER_RANGE = range(1, 70) # Powerball: 1–69 | |
| flat_all = df[['1', '2', '3', '4', '5']].values.flatten() | |
| freq_all = Counter(flat_all) | |
| freq_sorted = sorted(freq_all.items(), key=lambda x: x[1], reverse=True) | |
| hot = freq_sorted[:top_n] | |
| cold = sorted(freq_sorted[-top_n:], key=lambda x: x[1]) # sort from less to much | |
| return hot, cold | |
| if __name__ == "__main__": | |
| raw = pd.read_csv("../data/pb_results.csv") | |
| df = clean_powerball_df(raw) #DELETE ROWS WHICH CONTAINS "DOUBLE PLAY" | |
| result = generate_pb_prediction(df) | |
| predicted_star_ball = pb_predict_star_ball(df) | |
| print(f"🌟 Predicted Star Ball: {predicted_star_ball}") | |
| #print("Final result:", result) | |