betting-analyst / main.py
AKMESSI's picture
Upload main.py
c401430 verified
"""
European Football Analysis System - Main Entry Point
Continuously monitors matches and sends analysis 30 minutes before kickoff
"""
import asyncio
import signal
import sys
from datetime import datetime, timedelta
from typing import Dict, Any, List, Set
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
# Import modules
from config.settings import operational_config, analysis_config, TARGET_LEAGUES
from modules.data_collector import data_collector
from modules.analysis_engine import analysis_engine, MatchAnalysis
from modules.telegram_bot import (
telegram_bot, send_analysis, send_coming_alert,
send_startup, send_daily_summary, send_error, is_bot_configured
)
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class FootballAnalysisSystem:
"""
Main system orchestrator for continuous football match analysis
"""
def __init__(self):
self.scheduler = AsyncIOScheduler()
self.running = False
self.analyzed_matches: Set[str] = set() # Track matches we've analyzed
self.alerted_matches: Set[str] = set() # Track matches we've alerted about
self.stats = {
'analyses_sent': 0,
'matches_analyzed': 0,
'errors': 0
}
async def initialize(self):
"""Initialize the system"""
logger.info("=" * 60)
logger.info("FOOTBALL ANALYSIS SYSTEM")
logger.info("=" * 60)
logger.info(f"Check interval: {operational_config.CHECK_INTERVAL_MINUTES} minutes")
logger.info(f"Analysis window: T-{analysis_config.ANALYSIS_WINDOW_MINUTES} minutes")
logger.info(f"Target leagues: {list(TARGET_LEAGUES.keys())}")
logger.info("=" * 60)
# Check Telegram configuration
if not is_bot_configured():
logger.error("Telegram bot not configured! Please set TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID")
return False
# Send startup notification
await send_startup()
# Schedule tasks
self._schedule_tasks()
logger.info("System initialized successfully")
return True
def _schedule_tasks(self):
"""Schedule periodic tasks"""
# Main check cycle - every 5 minutes
self.scheduler.add_job(
self.check_matches,
IntervalTrigger(minutes=operational_config.CHECK_INTERVAL_MINUTES),
id='check_matches',
replace_existing=True,
max_instances=1
)
# Daily summary - at midnight
self.scheduler.add_job(
self.send_daily_summary,
trigger='cron',
hour=0,
minute=0,
id='daily_summary',
replace_existing=True
)
# Reset daily stats at midnight
self.scheduler.add_job(
self.reset_daily_stats,
trigger='cron',
hour=0,
minute=1,
id='reset_stats',
replace_existing=True
)
logger.info("Scheduled tasks configured")
async def check_matches(self):
"""
Main check cycle - runs every 5 minutes
"""
logger.info("\n" + "=" * 60)
logger.info("CHECKING FOR MATCHES...")
logger.info("=" * 60)
try:
async with data_collector as collector:
# Fetch all matches in next 24 hours
matches = await collector.fetch_matches_next_24h()
if not matches:
logger.info("No upcoming matches found")
return
logger.info(f"Found {len(matches)} upcoming matches")
# Process each match
for match in matches:
await self._process_match(match, collector)
# Small delay between matches
await asyncio.sleep(1)
logger.info("=" * 60)
logger.info(f"Check complete. Stats: {self.stats}")
logger.info("=" * 60)
except Exception as e:
logger.error(f"Error in check cycle: {e}")
self.stats['errors'] += 1
await send_error(str(e), "Main check cycle")
async def _process_match(self, match: Dict, collector):
"""Process a single match"""
match_id = match['match_id']
home_team = match['home_team']
away_team = match['away_team']
kickoff = match['kickoff_datetime']
# Calculate time until kickoff
now = datetime.now(kickoff.tzinfo)
time_until = kickoff - now
minutes_until = int(time_until.total_seconds() / 60)
logger.info(f"\n📊 {home_team} vs {away_team} - T-{minutes_until}min")
# Check if we should send analysis (T-30 minutes)
if analysis_config.PRE_MATCH_WINDOW_END <= minutes_until <= analysis_config.PRE_MATCH_WINDOW_START:
if match_id not in self.analyzed_matches:
logger.info(f" 🎯 Match in analysis window - performing deep analysis")
await self._analyze_and_send(match, collector)
else:
logger.info(f" ⏭️ Already analyzed this match")
# Check if we should send "analysis coming" alert (T-35 to T-50 minutes)
elif 35 <= minutes_until <= 50:
if match_id not in self.alerted_matches and match_id not in self.analyzed_matches:
logger.info(f" 📢 Sending 'analysis coming' alert")
await send_coming_alert(match, minutes_until)
self.alerted_matches.add(match_id)
# Clean up old matches from tracking
elif minutes_until < -120: # Match ended 2+ hours ago
self.analyzed_matches.discard(match_id)
self.alerted_matches.discard(match_id)
async def _analyze_and_send(self, match: Dict, collector):
"""Perform deep analysis and send to Telegram"""
match_id = match['match_id']
try:
# Gather all data
logger.info(f" 📥 Collecting data...")
# Team statistics
team_stats = {'home': {}, 'away': {}}
if match.get('home_team_id') and match.get('away_team_id'):
# Get league ID
league_id = None
for code, info in TARGET_LEAGUES.items():
if info['name'] == match['competition']:
league_id = info.get('api_football_id')
break
if league_id:
home_stats = await collector.fetch_team_statistics(
match['home_team_id'], league_id
)
away_stats = await collector.fetch_team_statistics(
match['away_team_id'], league_id
)
team_stats = {'home': home_stats, 'away': away_stats}
await asyncio.sleep(0.5)
# Head-to-head
h2h_data = []
if match.get('home_team_id') and match.get('away_team_id'):
h2h_data = await collector.fetch_head_to_head(
match['home_team_id'], match['away_team_id']
)
await asyncio.sleep(0.5)
# Lineups
lineup_data = await collector.fetch_lineups(match_id)
await asyncio.sleep(0.5)
# Odds
odds_data = await collector.fetch_odds(match_id)
# Perform analysis
logger.info(f" 🔬 Performing deep analysis...")
analysis = analysis_engine.analyze_match(
match, team_stats, h2h_data, odds_data, lineup_data
)
# Send to Telegram
logger.info(f" 📤 Sending analysis to Telegram...")
success = await send_analysis(analysis)
if success:
self.analyzed_matches.add(match_id)
self.stats['analyses_sent'] += 1
self.stats['matches_analyzed'] += 1
logger.info(f" ✅ Analysis sent successfully!")
else:
logger.error(f" ❌ Failed to send analysis")
except Exception as e:
logger.error(f" ❌ Error analyzing match: {e}")
self.stats['errors'] += 1
async def send_daily_summary(self):
"""Send daily summary"""
await send_daily_summary(
self.stats['analyses_sent'],
self.stats['matches_analyzed']
)
def reset_daily_stats(self):
"""Reset daily statistics"""
logger.info("Resetting daily statistics")
self.stats['analyses_sent'] = 0
self.stats['matches_analyzed'] = 0
self.stats['errors'] = 0
async def run(self):
"""Main run loop"""
initialized = await self.initialize()
if not initialized:
logger.error("Failed to initialize system")
return
# Start scheduler
self.scheduler.start()
logger.info("Scheduler started")
# Run initial check
await self.check_matches()
# Keep running
self.running = True
logger.info("System is running continuously...")
try:
while self.running:
await asyncio.sleep(60) # Keep alive
except asyncio.CancelledError:
logger.info("Received cancellation signal")
finally:
self.shutdown()
def shutdown(self):
"""Shutdown the system gracefully"""
logger.info("Shutting down system...")
self.running = False
if self.scheduler.running:
self.scheduler.shutdown()
logger.info("Shutdown complete")
# Signal handlers
def signal_handler(signum, frame):
"""Handle shutdown signals"""
logger.info(f"Received signal {signum}")
sys.exit(0)
# Register signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
async def main():
"""Main entry point"""
system = FootballAnalysisSystem()
try:
await system.run()
except KeyboardInterrupt:
logger.info("Interrupted by user")
except Exception as e:
logger.error(f"Fatal error: {e}")
finally:
system.shutdown()
if __name__ == "__main__":
asyncio.run(main())