| |
| import gradio as gr |
| import numpy as np |
| import pandas as pd |
| from datetime import datetime |
| import random |
| from collections import defaultdict |
| import math |
| import os |
| import csv |
| import asyncio |
| from fastapi import FastAPI |
| from pydantic import BaseModel |
| from typing import List |
| import json |
| import uvicorn |
|
|
| |
| CONFIG = { |
| "HISTORY_LIMIT": 500, |
| "PINK_THRESHOLD": 3.0, |
| "BIG_PINK_THRESHOLD": 5.0, |
| "DATA_FILE": "collected_data.csv", |
| } |
|
|
| |
| CUSTOM_CSS = """ |
| .gradio-container { |
| background: #0a0a0f !important; |
| color: #ffffff !important; |
| font-family: 'Inter', sans-serif !important; |
| } |
| footer {visibility: hidden} |
| h1 { color: #00d4ff !important; text-align: center; margin-bottom: 20px; text-shadow: 0 0 10px #00d4ff; } |
| .gr-box { border: 1px solid #333 !important; background: rgba(255,255,255,0.05) !important; } |
| .gr-button-primary { background: linear-gradient(135deg, #00d4ff, #0088ff) !important; border: none !important; } |
| .gr-button-secondary { background: rgba(255,255,255,0.1) !important; border: 1px solid #00d4ff !important; margin-top: 20px !important; } |
| .gr-dataframe { background: rgba(255,255,255,0.05) !important; } |
| """ |
|
|
| |
| TIME_STATS = None |
| def load_time_statistics(): |
| global TIME_STATS |
| try: |
| if os.path.exists('aviator_Rounds_history_scrp.xlsx'): |
| df = pd.read_excel('aviator_Rounds_history_scrp.xlsx', sheet_name='scarping rounds crash') |
| df = df[['ROUNDS', 'TIME ROUND']].dropna() |
| df['multiplier'] = pd.to_numeric(df['ROUNDS'], errors='coerce') |
| df = df.dropna() |
| df['hour'] = pd.to_datetime(df['TIME ROUND'], format='%H:%M').dt.hour |
| stats = df.groupby('hour')['multiplier'].agg(['mean', 'std', 'count']).to_dict('index') |
| for h in range(24): |
| if h not in stats: |
| stats[h] = {'mean': 1.8, 'std': 1.0, 'count': 0} |
| TIME_STATS = stats |
| print(f"✅ সময় পরিসংখ্যান লোড হয়েছে। মোট রেকর্ড: {len(df)}") |
| else: |
| print("⚠️ এক্সেল ফাইল পাওয়া যায়নি। ডিফল্ট পরিসংখ্যান ব্যবহার করা হবে।") |
| TIME_STATS = {h: {'mean': 1.8, 'std': 1.0, 'count': 100} for h in range(24)} |
| except Exception as e: |
| print(f"ডেটা লোড করতে সমস্যা: {e}") |
| TIME_STATS = {h: {'mean': 1.8, 'std': 1.0, 'count': 100} for h in range(24)} |
|
|
| load_time_statistics() |
|
|
| |
| class StatisticalModelV1: |
| def predict(self, history): |
| recent = history[:15] |
| if len(recent) < 3: |
| return {'prediction': 1.5, 'confidence': 0.3} |
| q1, q3 = np.percentile(recent, [25, 75]) |
| iqr = q3 - q1 |
| filtered = [x for x in recent if (q1 - 1.5*iqr) <= x <= (q3 + 1.5*iqr)] |
| if len(filtered) < 3: |
| filtered = recent |
| x = np.arange(len(filtered)) |
| weights = np.linspace(1.5, 0.5, len(filtered)) |
| weights /= weights.sum() |
| weighted_mean_x = np.average(x, weights=weights) |
| weighted_mean_y = np.average(filtered, weights=weights) |
| numerator = np.sum(weights * (x - weighted_mean_x) * (filtered - weighted_mean_y)) |
| denominator = np.sum(weights * (x - weighted_mean_x)**2) |
| trend = numerator / denominator if denominator != 0 else 0 |
| prediction = np.median(filtered) + trend * 1.5 |
| cv = np.std(filtered) / (np.mean(filtered) + 0.1) |
| confidence = min(0.85, 0.5 + len(filtered)/len(recent)*0.3 - cv*0.2) |
| return {'prediction': float(prediction), 'confidence': float(confidence)} |
|
|
| |
| class StatisticalModelV2: |
| def predict(self, history): |
| timeframes = {'short': history[:5], 'medium': history[:10], 'long': history[:20]} |
| preds, confs = [], [] |
| for name, data in timeframes.items(): |
| if len(data) < 3: |
| continue |
| ma_3 = np.mean(data[:3]) if len(data)>=3 else np.mean(data) |
| ma_5 = np.mean(data[:5]) if len(data)>=5 else ma_3 |
| ema = data[0] |
| alpha = 0.3 |
| for v in data[1:]: |
| ema = alpha*v + (1-alpha)*ema |
| x = np.arange(len(data)) |
| trend = np.polyfit(x, data, 1)[0] |
| base = np.mean([ma_3, ma_5, ema]) |
| preds.append(base + trend * len(data) / 10) |
| confs.append(min(0.9, 0.5 + len(data)/40)) |
| if not preds: |
| return {'prediction': 1.5, 'confidence': 0.3} |
| weights = {'short':0.5, 'medium':0.3, 'long':0.2} |
| final_pred = 0 |
| total_weight = 0 |
| for i, name in enumerate(timeframes.keys()): |
| if i < len(preds): |
| w = weights.get(name, 0.2) * confs[i] |
| final_pred += preds[i] * w |
| total_weight += w |
| final_pred /= total_weight if total_weight else 1 |
| confidence = np.mean(confs) * 0.9 |
| return {'prediction': float(final_pred), 'confidence': float(confidence)} |
|
|
| |
| class StatisticalModelV3: |
| def detect_cycles(self, history): |
| if len(history) < 10: |
| return None |
| cycles = [] |
| for period in range(3, 7): |
| corrs = [] |
| for i in range(len(history) - period*2): |
| seg1 = history[i:i+period] |
| seg2 = history[i+period:i+period*2] |
| if len(seg1) == len(seg2): |
| corr = np.corrcoef(seg1, seg2)[0,1] |
| if not np.isnan(corr): |
| corrs.append(abs(corr)) |
| if corrs and np.mean(corrs) > 0.6: |
| cycles.append({'period': period, 'strength': float(np.mean(corrs))}) |
| return cycles if cycles else None |
| def predict(self, history): |
| recent = history[:20] |
| cycles = self.detect_cycles(recent) |
| cycle_pred = None |
| if cycles: |
| best = max(cycles, key=lambda x: x['strength']) |
| period = best['period'] |
| if len(recent) > period: |
| next_val = recent[period:period+1] |
| if next_val: |
| cycle_pred = next_val[0] * (1 + best['strength'] * 0.1) |
| base_pred = np.median(recent) |
| if cycle_pred: |
| base_pred = (base_pred + cycle_pred) / 2 |
| prediction = max(1.05, min(10000.0, base_pred)) |
| confidence = min(0.9, 0.5 + len(recent)/40 + (0.15 if cycles else 0)) |
| return {'prediction': float(prediction), 'confidence': float(confidence)} |
|
|
| |
| class StatisticalModelV4: |
| def __init__(self): |
| self.performance = [] |
| self.bias = 0 |
| self.volatility_regime = 'normal' |
| def detect_volatility(self, history): |
| if len(history) < 10: |
| return 'normal' |
| recent_vol = np.std(history[:5]) |
| long_vol = np.std(history[:20]) if len(history)>=20 else recent_vol |
| if recent_vol > long_vol * 1.5: |
| return 'high' |
| elif recent_vol < long_vol * 0.5: |
| return 'low' |
| else: |
| return 'normal' |
| def predict(self, history): |
| recent = history[:15] |
| self.volatility_regime = self.detect_volatility(history) |
| mean_val, median_val = np.mean(recent), np.median(recent) |
| x = np.arange(len(recent)) |
| weights = np.exp(-0.2 * x) |
| weights /= weights.sum() |
| weighted_mean_x = np.average(x, weights=weights) |
| weighted_mean_y = np.average(recent, weights=weights) |
| numerator = np.sum(weights * (x - weighted_mean_x) * (recent - weighted_mean_y)) |
| denominator = np.sum(weights * (x - weighted_mean_x)**2) |
| trend = numerator / denominator if denominator != 0 else 0 |
| preds = {'mean': mean_val, 'median': median_val, 'trend': median_val + trend * len(recent) * 0.5} |
| w = {'mean': 0.3, 'median': 0.4, 'trend': 0.3} |
| if self.volatility_regime == 'high': |
| w['median'] *= 1.5 |
| elif self.volatility_regime == 'low': |
| w['trend'] *= 1.3 |
| total = sum(w.values()) |
| for k in w: |
| w[k] /= total |
| prediction = sum(preds[k] * w[k] for k in preds) + self.bias |
| confidence = 0.5 + len(recent)/30 |
| if self.volatility_regime == 'high': |
| confidence *= 0.8 |
| elif self.volatility_regime == 'low': |
| confidence *= 1.2 |
| if self.performance: |
| recent_perf = np.mean(self.performance[-10:]) if len(self.performance)>=10 else np.mean(self.performance) |
| confidence *= (1 + recent_perf * 0.1) |
| confidence = min(0.9, confidence) |
| return {'prediction': float(max(1.05, min(10000.0, prediction))), 'confidence': float(confidence)} |
| def update(self, actual, predicted): |
| error = abs(actual - predicted) / actual |
| acc = max(0, 1 - error) |
| self.performance.append(acc) |
| if len(self.performance) > 100: |
| self.performance = self.performance[-100:] |
| self.bias += (actual - predicted) * 0.01 |
|
|
| |
| class StatisticalModelV5: |
| def __init__(self): |
| self.n_estimators = 10 |
| def predict(self, history): |
| if len(history) < 10: |
| return {'prediction': 1.5, 'confidence': 0.5} |
| recent = history[:10] |
| trees = [] |
| for _ in range(self.n_estimators): |
| idx = np.random.choice(len(recent), size=len(recent), replace=True) |
| sample = [recent[i] for i in idx] |
| if np.random.random() > 0.5: |
| trees.append(np.mean(sample)) |
| else: |
| trees.append(np.median(sample)) |
| pred = float(np.mean(trees)) |
| return {'prediction': pred, 'confidence': 0.7} |
|
|
| |
| class StatisticalModelV6: |
| def __init__(self, time_stats): |
| self.time_stats = time_stats |
| def predict(self, history, current_hour=None): |
| if current_hour is None: |
| current_hour = datetime.now().hour |
| stats = self.time_stats.get(current_hour, {'mean': 1.8, 'std': 1.0}) |
| base_pred = np.median(history[:5]) if len(history)>=5 else 1.5 |
| alpha = 0.3 |
| prediction = base_pred * (1 - alpha) + stats['mean'] * alpha |
| confidence = min(0.85, 0.5 + stats.get('count', 100) / 500) |
| return {'prediction': float(prediction), 'confidence': float(confidence), 'hour': current_hour} |
|
|
| |
| class NeuralNetwork: |
| def __init__(self): |
| self.weights = { |
| 'input': np.random.randn(15) * 0.1, |
| 'hidden': np.random.randn(10) * 0.1, |
| 'output': np.random.randn(5) * 0.1 |
| } |
| self.performance_history = [] |
| def extract_features(self, history): |
| recent = history[:12] |
| features = [] |
| for val in recent: |
| features.append(math.log(val + 0.1) / math.log(10)) |
| mean_val = np.mean(recent) if recent else 1.5 |
| std_val = np.std(recent) if recent else 0.2 |
| features.append(mean_val) |
| features.append(std_val / (mean_val + 0.1)) |
| if len(recent) >= 3: |
| trend = (recent[0] - recent[-1]) / len(recent) |
| features.append(trend) |
| else: |
| features.append(0) |
| pink_count = sum(1 for v in recent if v >= CONFIG["PINK_THRESHOLD"]) |
| features.append(pink_count / len(recent) if recent else 0) |
| while len(features) < 15: |
| features.append(0) |
| return np.array(features[:15]) |
| def predict(self, history): |
| if len(history) < 5: |
| return {'prediction': 1.5, 'confidence': 0.3} |
| features = self.extract_features(history) |
| hidden = np.tanh(np.dot(features, self.weights['input'][:len(features)])) |
| output = np.tanh(hidden * np.mean(self.weights['hidden'])) |
| prediction = 1.5 + (output * 3.0) |
| prediction = max(1.05, min(10000.0, prediction)) |
| confidence = min(0.9, 0.5 + (len(history) / 200) + abs(output) * 0.2) |
| return {'prediction': float(prediction), 'confidence': float(confidence)} |
|
|
| class SequenceAnalyzer: |
| def __init__(self): |
| self.max_pattern_length = 6 |
| def find_patterns(self, history): |
| patterns = [] |
| for length in range(2, min(self.max_pattern_length, len(history) // 2)): |
| for i in range(len(history) - length * 2): |
| pattern = history[i:i+length] |
| next_seq = history[i+length:i+length*2] |
| similarity = self.calculate_similarity(pattern, next_seq) |
| if similarity > 0.6: |
| patterns.append({'pattern': pattern, 'next': next_seq, 'similarity': similarity, 'length': length}) |
| return patterns |
| def calculate_similarity(self, seq1, seq2): |
| if len(seq1) != len(seq2) or len(seq1) == 0: |
| return 0 |
| diffs = [abs(seq1[i] - seq2[i]) / (max(seq1[i], seq2[i]) + 0.1) for i in range(len(seq1))] |
| avg_diff = np.mean(diffs) if diffs else 1 |
| return max(0, 1 - avg_diff) |
| def predict(self, history): |
| if len(history) < 4: |
| return {'prediction': 1.5, 'confidence': 0.3} |
| patterns = self.find_patterns(history) |
| if not patterns: |
| return {'prediction': 1.5, 'confidence': 0.4} |
| best = max(patterns, key=lambda p: p['similarity'] * p['length']) |
| trend = (best['pattern'][-1] - best['pattern'][0]) / len(best['pattern']) |
| prediction = best['pattern'][-1] + trend |
| prediction = max(1.05, min(10000.0, prediction)) |
| confidence = best['similarity'] * 0.8 |
| return {'prediction': float(prediction), 'confidence': float(confidence)} |
|
|
| class MarkovChain: |
| def __init__(self): |
| self.transition_matrix = defaultdict(lambda: defaultdict(float)) |
| self.states = ['very_low', 'low', 'medium', 'high', 'pink'] |
| def discretize(self, value): |
| if value < 1.3: |
| return 'very_low' |
| elif value < 1.8: |
| return 'low' |
| elif value < 2.5: |
| return 'medium' |
| elif value < CONFIG["PINK_THRESHOLD"]: |
| return 'high' |
| else: |
| return 'pink' |
| def build_model(self, history): |
| self.transition_matrix.clear() |
| for i in range(len(history) - 1): |
| current = self.discretize(history[i]) |
| next_state = self.discretize(history[i+1]) |
| self.transition_matrix[current][next_state] += 1 |
| for state in self.transition_matrix: |
| total = sum(self.transition_matrix[state].values()) |
| if total > 0: |
| for next_state in self.transition_matrix[state]: |
| self.transition_matrix[state][next_state] /= total |
| def predict(self, history): |
| if len(history) < 2: |
| return {'prediction': 1.5, 'confidence': 0.3} |
| self.build_model(history) |
| current_state = self.discretize(history[0]) |
| probs = self.transition_matrix.get(current_state, {}) |
| if not probs: |
| probs = {'very_low':0.2, 'low':0.4, 'medium':0.25, 'high':0.1, 'pink':0.05} |
| state_values = {'very_low':1.15, 'low':1.5, 'medium':2.2, 'high':2.8, 'pink':4.5} |
| prediction = sum(state_values[s] * probs.get(s,0) for s in self.states) / (sum(probs.values()) or 1) |
| confidence = max(probs.values()) * 0.9 if probs else 0.3 |
| return {'prediction': float(prediction), 'confidence': float(confidence)} |
|
|
| class StatisticalPredictor: |
| def predict(self, history): |
| recent = history[:15] |
| mean_val = np.mean(recent) |
| median_val = np.median(recent) |
| x = np.arange(len(recent)) |
| trend = np.polyfit(x, recent, 1)[0] if len(recent) > 1 else 0 |
| std_val = np.std(recent) |
| prediction = median_val + trend * 1.5 |
| if std_val > 1.0: |
| prediction += random.uniform(-0.5, 0.5) |
| prediction = max(1.05, min(10000.0, prediction)) |
| confidence = min(0.8, 0.5 + (len(history)/200) - (std_val/10)) |
| return {'prediction': float(prediction), 'confidence': float(confidence)} |
|
|
| class RepositoryEnsemble: |
| def __init__(self): |
| self.models = { |
| 'neural': NeuralNetwork(), |
| 'sequence': SequenceAnalyzer(), |
| 'markov': MarkovChain(), |
| 'stat': StatisticalPredictor() |
| } |
| self.weights = {'neural':0.35, 'sequence':0.30, 'markov':0.20, 'stat':0.15} |
| self.performance = defaultdict(list) |
| def predict(self, history): |
| if len(history) < 5: |
| return {'prediction': 1.5, 'confidence': 0.3} |
| preds = {} |
| confs = {} |
| for name, model in self.models.items(): |
| res = model.predict(history) |
| preds[name] = res['prediction'] |
| confs[name] = res['confidence'] |
| total_weight = 0 |
| weighted_sum = 0 |
| for name, pred in preds.items(): |
| w = self.weights.get(name, 0.2) * confs[name] |
| weighted_sum += pred * w |
| total_weight += w |
| final_pred = weighted_sum / total_weight if total_weight > 0 else 1.5 |
| final_pred = max(1.05, min(10000.0, final_pred)) |
| confidence = np.mean(list(confs.values())) * 0.9 |
| return {'prediction': float(final_pred), 'confidence': float(confidence)} |
|
|
| |
| class StrategyManager: |
| def __init__(self, models_dict): |
| self.models = models_dict |
| self.performance_history = {name: [] for name in self.models} |
| self.best_strategy = None |
| self.best_score = 0 |
| self.window_size = 20 |
|
|
| def register_prediction(self, model_name, predicted, actual): |
| error = abs(actual - predicted) / actual |
| accuracy = max(0, 1 - error) |
| self.performance_history[model_name].append(accuracy) |
| if len(self.performance_history[model_name]) > 100: |
| self.performance_history[model_name] = self.performance_history[model_name][-100:] |
|
|
| def get_best_strategy(self): |
| best_name = None |
| best_avg = -1 |
| report = {} |
| for name, perf_list in self.performance_history.items(): |
| if len(perf_list) >= self.window_size: |
| recent = perf_list[-self.window_size:] |
| avg_acc = np.mean(recent) |
| elif len(perf_list) > 0: |
| avg_acc = np.mean(perf_list) |
| else: |
| avg_acc = 0 |
| report[name] = avg_acc |
| if avg_acc > best_avg: |
| best_avg = avg_acc |
| best_name = name |
| self.best_strategy = best_name |
| self.best_score = best_avg |
| return best_name, best_avg, report |
|
|
| |
| class EnsemblePredictorV7: |
| def __init__(self, time_stats): |
| self.models = { |
| 'v1': StatisticalModelV1(), |
| 'v2': StatisticalModelV2(), |
| 'v3': StatisticalModelV3(), |
| 'v4': StatisticalModelV4(), |
| 'v5': StatisticalModelV5(), |
| 'v6': StatisticalModelV6(time_stats), |
| 'repo': RepositoryEnsemble() |
| } |
| self.ensemble_weights = {'v1':0.15, 'v2':0.15, 'v3':0.1, 'v4':0.1, 'v5':0.1, 'v6':0.1, 'repo':0.3} |
| self.performance = defaultdict(list) |
| self.strategy_manager = StrategyManager(self.models) |
| self.last_predictions = {} |
|
|
| def predict(self, history): |
| if len(history) < 5: |
| return self._default_prediction(f"মাত্র {len(history)}টি রাউন্ড, ৫টি প্রয়োজন") |
| |
| current_hour = datetime.now().hour |
| preds = {} |
| confs = {} |
| self.last_predictions.clear() |
| |
| for name, model in self.models.items(): |
| if name == 'v6': |
| res = model.predict(history, current_hour) |
| else: |
| res = model.predict(history) |
| preds[name] = res['prediction'] |
| confs[name] = res.get('confidence', 0.5) |
| self.last_predictions[name] = res['prediction'] |
| |
| |
| for name in self.ensemble_weights: |
| if name in self.performance and self.performance[name]: |
| recent_acc = np.mean(self.performance[name][-20:]) if len(self.performance[name])>=20 else np.mean(self.performance[name]) |
| self.ensemble_weights[name] = 0.1 + recent_acc * 0.8 |
| |
| total = sum(self.ensemble_weights.values()) |
| for name in self.ensemble_weights: |
| self.ensemble_weights[name] /= total |
| |
| final_pred = 0 |
| total_weight = 0 |
| for name, pred in preds.items(): |
| weight = self.ensemble_weights.get(name, 0.2) * confs[name] |
| final_pred += pred * weight |
| total_weight += weight |
| |
| final_pred /= total_weight if total_weight else 1 |
| |
| |
| recent = history[:10] |
| vol = np.std(recent) / (np.mean(recent)+0.1) if recent else 0.3 |
| if vol > 0.5: |
| state = "অস্থির 🌪️" |
| elif vol < 0.2: |
| state = "স্থিতিশীল ✨" |
| else: |
| state = "সাধারণ ➡️" |
| |
| confidence = np.mean(list(confs.values())) * 0.9 |
| if vol < 0.2: |
| confidence *= 1.1 |
| elif vol > 0.5: |
| confidence *= 0.9 |
| confidence = min(0.95, confidence) |
| |
| all_preds = list(preds.values()) |
| std = np.std(all_preds) if len(all_preds)>1 else 0.2 |
| spread = std * (2 - confidence) |
| spread = max(0.1, min(1.5, spread)) |
| interval = (max(1.01, final_pred - spread/2), final_pred + spread/2) |
| |
| if final_pred > 3.0: |
| decision = "বড় 🚀" |
| elif final_pred > 1.8: |
| decision = "মাঝারি 💪" |
| else: |
| decision = "ছোট 🎯" |
| |
| hour_stats = TIME_STATS.get(current_hour, {'mean':1.8, 'count':0}) |
| time_info = f"বর্তমান ঘণ্টা: {current_hour}:00 – ঐতিহাসিক গড়: {hour_stats['mean']:.2f}x (ডাটা: {hour_stats['count']}টি)" |
| |
| best_name, best_score, _ = self.strategy_manager.get_best_strategy() |
| if best_name: |
| strategy_line = f"\n🏆 **বর্তমান সেরা কৌশল**: `{best_name}` (নির্ভুলতা: {best_score*100:.1f}%)" |
| else: |
| strategy_line = "\n🏆 **বর্তমান সেরা কৌশল**: পর্যাপ্ত ডাটা নেই" |
| |
| summary = ( |
| f"🎯 **প্রেডিকশন ইন্টারভ্যাল**: {interval[0]:.2f}x – {interval[1]:.2f}x\n" |
| f"📊 **এক্সপেক্টেড মাল্টিপ্লায়ার**: {final_pred:.2f}x\n" |
| f"📈 **কনফিডেন্স**: {confidence*100:.1f}%\n" |
| f"⚡ **মার্কেট স্টেট**: {state}\n" |
| f"🎲 **ডিসিশন**: {decision}\n" |
| f"⏰ **টাইম ফিচার**: {time_info}\n" |
| f"📌 **ডাটা পয়েন্ট**: {len(history)}টি রাউন্ড" |
| f"{strategy_line}" |
| ) |
| |
| return { |
| 'summary': summary, |
| 'prediction': final_pred, |
| 'interval': interval, |
| 'confidence': confidence, |
| 'decision': decision, |
| 'analysis': state, |
| 'hour': current_hour |
| } |
| |
| def _default_prediction(self, msg): |
| return { |
| 'summary': f"⚠️ {msg}\n\n📊 ডিফল্ট প্রেডিকশন: 1.50x (কনফিডেন্স 30%)", |
| 'prediction': 1.5, |
| 'interval': (1.3, 1.7), |
| 'confidence': 0.3, |
| 'decision': 'ছোট 🎯', |
| 'analysis': 'অপ্রতুল ডাটা' |
| } |
| |
| def update_performance(self, actual_value): |
| for name, predicted in self.last_predictions.items(): |
| error = abs(actual_value - predicted) / actual_value |
| acc = max(0, 1 - error) |
| self.performance[name].append(acc) |
| if len(self.performance[name]) > 100: |
| self.performance[name] = self.performance[name][-100:] |
| self.strategy_manager.register_prediction(name, predicted, actual_value) |
|
|
| |
| class AviatorPredictorApp: |
| def __init__(self): |
| self.history = [] |
| self.model = EnsemblePredictorV7(TIME_STATS) |
| self.last_prediction = None |
| self.load_history_from_csv() |
| |
| def load_history_from_csv(self): |
| if os.path.exists(CONFIG["DATA_FILE"]): |
| try: |
| df = pd.read_csv(CONFIG["DATA_FILE"]) |
| if 'multiplier' in df.columns: |
| multipliers = df['multiplier'].dropna().tolist() |
| self.history = multipliers[-CONFIG["HISTORY_LIMIT"]:] |
| print(f"✅ CSV থেকে {len(self.history)}টি রাউন্ড লোড করা হয়েছে") |
| except Exception as e: |
| print(f"CSV লোড করতে সমস্যা: {e}") |
| |
| def save_multiplier_to_csv(self, multiplier): |
| file_exists = os.path.exists(CONFIG["DATA_FILE"]) |
| with open(CONFIG["DATA_FILE"], mode='a', newline='', encoding='utf-8') as f: |
| writer = csv.writer(f) |
| if not file_exists: |
| writer.writerow(['timestamp', 'multiplier']) |
| writer.writerow([datetime.now().isoformat(), multiplier]) |
| |
| def add_round(self, multiplier): |
| if multiplier <= 0: |
| return self.get_all_outputs(error="ইনভ্যালিড মাল্টিপ্লায়ার (১.০ এর বেশি দিন)") |
| |
| multiplier = float(multiplier) |
| self.history.insert(0, multiplier) |
| |
| if len(self.history) > CONFIG["HISTORY_LIMIT"]: |
| self.history = self.history[:CONFIG["HISTORY_LIMIT"]] |
| |
| self.save_multiplier_to_csv(multiplier) |
| |
| if self.last_prediction is not None: |
| self.model.update_performance(multiplier) |
| |
| pred_result = self.model.predict(self.history) |
| self.last_prediction = pred_result['prediction'] |
| |
| return self.get_all_outputs() |
| |
| def reset(self): |
| self.history = [] |
| for _ in range(20): |
| self.history.append(round(random.uniform(1.0, 3.5), 2)) |
| self.history.sort(reverse=True) |
| self.last_prediction = None |
| return self.get_all_outputs() |
| |
| def get_all_outputs(self, error=None): |
| if error: |
| table = [[i+1, "?.??x"] for i in range(min(20, len(self.history)))] or [[1, "1.00x"]] |
| return [table, f"⚠️ {error}"] |
| |
| pred_result = self.model.predict(self.history) |
| table = [[i+1, f"{val:.2f}x"] for i, val in enumerate(self.history[:50])] |
| return [table, pred_result['summary']] |
| |
| def get_stats(self): |
| if os.path.exists(CONFIG["DATA_FILE"]): |
| try: |
| df = pd.read_csv(CONFIG["DATA_FILE"]) |
| return { |
| "total_rounds": len(df), |
| "last_round": float(df['multiplier'].iloc[-1]) if len(df) > 0 else None, |
| "max_multiplier": float(df['multiplier'].max()) if len(df) > 0 else None, |
| "min_multiplier": float(df['multiplier'].min()) if len(df) > 0 else None, |
| "avg_multiplier": float(df['multiplier'].mean()) if len(df) > 0 else None |
| } |
| except: |
| return {"error": "Could not read stats"} |
| return {"total_rounds": 0} |
|
|
| |
| app = AviatorPredictorApp() |
|
|
| |
| with gr.Blocks(title="AVOLD V7 Predictor", theme='default') as demo: |
| gr.HTML(""" |
| <div style="text-align: center; margin-bottom: 20px;"> |
| <h1 style="color: #00d4ff; font-size: 48px; margin: 0;">✈️ AVOLD V7</h1> |
| <p style="color: #888; font-size: 14px;">হাইব্রিড এনসেম্বল + DOM অটো ডাটা কালেকশন</p> |
| </div> |
| """) |
| |
| with gr.Row(): |
| inp = gr.Number(label="নতুন মাল্টিপ্লায়ার", value=1.0, step=0.1, minimum=1.0) |
| add_btn = gr.Button("➕ যোগ করুন", variant="primary") |
| |
| prediction_box = gr.Textbox(label="🧠 প্রেডিকশন রিপোর্ট", lines=12, interactive=False) |
| rounds_table = gr.Dataframe(label="📜 শেষ ৫০ রাউন্ড", headers=["রাউন্ড", "মাল্টিপ্লায়ার"], row_count=10) |
| reset_btn = gr.Button("🔄 রিসেট ডাটা", variant="secondary") |
| |
| with gr.Row(): |
| stats_btn = gr.Button("📊 পরিসংখ্যান দেখুন", variant="secondary") |
| stats_box = gr.Textbox(label="📈 ডাটা পরিসংখ্যান", lines=5, interactive=False) |
| |
| add_btn.click( |
| fn=app.add_round, |
| inputs=inp, |
| outputs=[rounds_table, prediction_box] |
| ) |
| |
| reset_btn.click( |
| fn=app.reset, |
| outputs=[rounds_table, prediction_box] |
| ) |
| |
| stats_btn.click( |
| fn=lambda: json.dumps(app.get_stats(), indent=2), |
| outputs=stats_box |
| ) |
| |
| demo.load( |
| fn=app.get_all_outputs, |
| outputs=[rounds_table, prediction_box] |
| ) |
|
|
| |
| from fastapi import FastAPI, Request |
| from fastapi.responses import JSONResponse |
|
|
| fast_app = FastAPI() |
|
|
| class BatchData(BaseModel): |
| rounds: List[float] |
| timestamp: str = None |
|
|
| @fast_app.post("/api/add_batch") |
| async def add_batch(data: BatchData): |
| """Tampermonkey স্ক্রিপ্ট থেকে ব্যাচ আকারে ডাটা গ্রহণ করে""" |
| try: |
| await asyncio.sleep(0.2) |
| for mult in data.rounds: |
| app.add_round(mult) |
| return JSONResponse(content={"status": "ok", "received": len(data.rounds)}) |
| except Exception as e: |
| return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) |
|
|
| @fast_app.get("/api/stats") |
| async def get_stats(): |
| """সংগৃহীত ডাটার পরিসংখ্যান দেখায়""" |
| try: |
| return JSONResponse(content=app.get_stats()) |
| except Exception as e: |
| return JSONResponse(content={"error": str(e)}, status_code=500) |
|
|
| |
| fast_app = gr.mount_gradio_app(fast_app, demo, path="/") |
|
|
| |
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(fast_app, host="0.0.0.0", port=7860) |