copper-mind / app /scheduler.py
ifieryarrows's picture
Sync from GitHub (tests passed)
8e48995 verified
"""
APScheduler-based daily automation.
Runs the data fetch + AI pipeline at a configured time each day.
"""
import logging
from datetime import datetime
from typing import Optional
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import pytz
from app.settings import get_settings
from app.lock import PipelineLock
from app.db import init_db
logger = logging.getLogger(__name__)
# Global scheduler instance
_scheduler: Optional[BackgroundScheduler] = None
def run_daily_pipeline():
"""
Execute the daily pipeline:
1. Fetch news and prices
2. Score sentiment
3. Aggregate daily sentiment
4. Generate fresh analysis snapshot
Uses pipeline lock to prevent concurrent runs.
Records metrics to PipelineRunMetrics table for monitoring.
"""
import json
import uuid
from datetime import timezone as tz
logger.info("Starting daily pipeline run...")
# Generate run ID and start time
run_id = f"run-{datetime.now(tz.utc).strftime('%Y%m%d-%H%M%S')}-{uuid.uuid4().hex[:8]}"
run_started_at = datetime.now(tz.utc)
# Initialize metrics
metrics = {
"run_id": run_id,
"run_started_at": run_started_at,
"status": "running",
"symbols_requested": 0,
"symbols_fetched_ok": 0,
"symbols_failed": 0,
"failed_symbols_list": [],
"news_imported": 0,
"news_duplicates": 0,
"price_bars_updated": 0,
"snapshot_generated": False,
"commentary_generated": False,
}
lock = PipelineLock(timeout=0)
if not lock.acquire():
logger.warning("Could not acquire lock - another pipeline may be running")
return
try:
# Import here to avoid circular imports
from app.data_manager import fetch_all
from app.ai_engine import run_full_pipeline
from app.inference import generate_analysis_report, save_analysis_snapshot
from app.db import SessionLocal
from app.models import PipelineRunMetrics
settings = get_settings()
# Record symbol set info
metrics["symbol_set_name"] = settings.symbol_set
metrics["symbols_requested"] = len(settings.training_symbols)
# Step 1: Fetch data
logger.info("Step 1/3: Fetching data...")
fetch_results = fetch_all(news=True, prices=True)
# Track news stats
news_stats = fetch_results.get('news', {})
metrics["news_imported"] = news_stats.get('imported', 0)
metrics["news_duplicates"] = news_stats.get('duplicates', 0)
# Track price stats
price_stats = fetch_results.get('prices', {})
total_bars = 0
failed_symbols = []
for symbol, stats in price_stats.items():
if stats.get('error'):
failed_symbols.append(symbol)
else:
total_bars += stats.get('imported', 0)
metrics["price_bars_updated"] = total_bars
metrics["symbols_fetched_ok"] = len(price_stats) - len(failed_symbols)
metrics["symbols_failed"] = len(failed_symbols)
metrics["failed_symbols_list"] = failed_symbols
logger.info(f"Data fetch complete: {metrics['news_imported']} news, {total_bars} price bars")
if failed_symbols:
logger.warning(f"Failed symbols: {failed_symbols}")
# Step 2: Run AI pipeline (score + aggregate only, no retraining by default)
logger.info("Step 2/3: Running AI pipeline (sentiment scoring)...")
ai_results = run_full_pipeline(
target_symbol=settings.target_symbol,
score_sentiment=True,
aggregate_sentiment=True,
train_model=False # Don't retrain daily, just refresh sentiment
)
logger.info(f"AI pipeline complete: {ai_results.get('scored_articles', 0)} articles scored, "
f"{ai_results.get('aggregated_days', 0)} days aggregated")
# Step 3: Generate fresh snapshot
logger.info("Step 3/4: Generating analysis snapshot...")
with SessionLocal() as session:
report = generate_analysis_report(session, settings.target_symbol)
if report:
save_analysis_snapshot(session, report, settings.target_symbol)
metrics["snapshot_generated"] = True
logger.info(f"Snapshot generated: predicted return {report.get('predicted_return', 'N/A')}")
# Step 4: Generate AI Commentary
logger.info("Step 4/4: Generating AI commentary...")
try:
from app.commentary import generate_and_save_commentary
from app.async_bridge import run_async_from_sync
from sqlalchemy import func
from app.models import NewsArticle
from datetime import timedelta
# Get news count for last 7 days
week_ago = datetime.now() - timedelta(days=7)
news_count = session.query(func.count(NewsArticle.id)).filter(
NewsArticle.published_at >= week_ago
).scalar() or 0
commentary = run_async_from_sync(
generate_and_save_commentary,
session=session,
symbol=settings.target_symbol,
current_price=report.get('current_price', 0),
predicted_price=report.get('predicted_price', 0),
predicted_return=report.get('predicted_return', 0),
sentiment_index=report.get('sentiment_index', 0),
sentiment_label=report.get('sentiment_label', 'Neutral'),
top_influencers=report.get('top_influencers', []),
news_count=news_count,
)
if commentary:
metrics["commentary_generated"] = True
logger.info("AI commentary generated and saved")
else:
logger.warning("AI commentary generation skipped (API key not configured or failed)")
except Exception as ce:
logger.error(f"AI commentary generation failed: {ce}")
else:
logger.warning("Could not generate analysis snapshot")
metrics["status"] = "success"
logger.info("Daily pipeline complete!")
except Exception as e:
metrics["status"] = "failed"
metrics["error_message"] = str(e)[:500]
logger.error(f"Daily pipeline failed: {e}", exc_info=True)
finally:
lock.release()
# Save metrics to database
try:
from app.db import SessionLocal
from app.models import PipelineRunMetrics
run_completed_at = datetime.now(tz.utc)
duration = (run_completed_at - run_started_at).total_seconds()
with SessionLocal() as session:
metrics_record = PipelineRunMetrics(
run_id=metrics["run_id"],
run_started_at=metrics["run_started_at"],
run_completed_at=run_completed_at,
duration_seconds=duration,
symbol_set_name=metrics.get("symbol_set_name"),
symbols_requested=metrics.get("symbols_requested"),
symbols_fetched_ok=metrics.get("symbols_fetched_ok"),
symbols_failed=metrics.get("symbols_failed"),
failed_symbols_list=json.dumps(metrics.get("failed_symbols_list", [])),
news_imported=metrics.get("news_imported"),
news_duplicates=metrics.get("news_duplicates"),
price_bars_updated=metrics.get("price_bars_updated"),
snapshot_generated=metrics.get("snapshot_generated", False),
commentary_generated=metrics.get("commentary_generated", False),
status=metrics.get("status", "unknown"),
error_message=metrics.get("error_message"),
)
session.add(metrics_record)
session.commit()
logger.info(f"Pipeline metrics saved: {run_id} ({metrics['status']}, {duration:.1f}s)")
except Exception as me:
logger.warning(f"Could not save pipeline metrics: {me}")
def parse_schedule_time(time_str: str) -> tuple[int, int]:
"""Parse HH:MM time string to (hour, minute) tuple."""
try:
parts = time_str.split(":")
hour = int(parts[0])
minute = int(parts[1]) if len(parts) > 1 else 0
return hour, minute
except (ValueError, IndexError):
logger.warning(f"Invalid schedule time '{time_str}', defaulting to 09:00")
return 9, 0
def start_scheduler():
"""Start the background scheduler."""
global _scheduler
if _scheduler is not None:
logger.warning("Scheduler already running")
return
settings = get_settings()
if not settings.scheduler_enabled:
logger.info("Scheduler is disabled by configuration")
return
# Parse schedule time
hour, minute = parse_schedule_time(settings.schedule_time)
# Get timezone
try:
tz = pytz.timezone(settings.tz)
except Exception:
logger.warning(f"Invalid timezone '{settings.tz}', using UTC")
tz = pytz.UTC
# Create scheduler
_scheduler = BackgroundScheduler(
timezone=tz,
job_defaults={
"coalesce": True, # Combine missed runs
"max_instances": 1, # Only one instance at a time
"misfire_grace_time": 3600, # 1 hour grace for misfires
}
)
# Add daily job
trigger = CronTrigger(
hour=hour,
minute=minute,
timezone=tz
)
_scheduler.add_job(
run_daily_pipeline,
trigger=trigger,
id="daily_pipeline",
name="Daily Data + AI Pipeline",
replace_existing=True
)
# Start
_scheduler.start()
logger.info(f"Scheduler started - daily pipeline at {hour:02d}:{minute:02d} {settings.tz}")
# Log next run time
job = _scheduler.get_job("daily_pipeline")
if job and job.next_run_time:
logger.info(f"Next scheduled run: {job.next_run_time}")
def stop_scheduler():
"""Stop the background scheduler gracefully."""
global _scheduler
if _scheduler is None:
return
try:
_scheduler.shutdown(wait=False)
logger.info("Scheduler stopped")
except Exception as e:
logger.error(f"Error stopping scheduler: {e}")
finally:
_scheduler = None
def get_scheduler_status() -> dict:
"""Get current scheduler status."""
global _scheduler
if _scheduler is None:
return {
"running": False,
"next_run": None,
"job_count": 0
}
jobs = _scheduler.get_jobs()
next_run = None
for job in jobs:
if job.next_run_time:
if next_run is None or job.next_run_time < next_run:
next_run = job.next_run_time
return {
"running": _scheduler.running,
"next_run": next_run.isoformat() if next_run else None,
"job_count": len(jobs)
}
def trigger_pipeline_now():
"""
Manually trigger the pipeline immediately.
For CLI or administrative use.
"""
logger.info("Manual pipeline trigger requested")
run_daily_pipeline()