""" Background Data Ingestion Tasks. Celery tasks for async data pulls when a broker is available, or synchronous fallback functions when running without Celery. """ from __future__ import annotations import asyncio import logging from typing import Any, Dict, List logger = logging.getLogger(__name__) def _run_async(coro): """Run an async function from synchronous context.""" try: loop = asyncio.get_event_loop() if loop.is_running(): import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as pool: return pool.submit(asyncio.run, coro).result() return loop.run_until_complete(coro) except RuntimeError: return asyncio.run(coro) def ingest_prices(tickers: List[str], period: str = "1y") -> Dict[str, Any]: """Ingest price data for a list of tickers.""" from app.services.data_ingestion.yahoo import yahoo_adapter results = {} for ticker in tickers: try: data = _run_async(yahoo_adapter.fetch_price_history(ticker, period=period)) results[ticker] = { "status": "success", "records": len(data.get("data", [])), } except Exception as e: results[ticker] = {"status": "error", "error": str(e)} logger.error("Price ingestion failed for %s: %s", ticker, e) return results def ingest_news(query: str, page_size: int = 20) -> Dict[str, Any]: """Ingest news articles for a query.""" from app.services.data_ingestion.news import news_adapter try: data = _run_async(news_adapter.fetch_articles(query, page_size=page_size)) return { "status": "success", "total_results": data.get("total_results", 0), "articles_fetched": len(data.get("articles", [])), } except Exception as e: logger.error("News ingestion failed for '%s': %s", query, e) return {"status": "error", "error": str(e)} def compute_features(ticker: str, period: str = "1y") -> Dict[str, Any]: """Compute technical features for a ticker.""" from app.services.data_ingestion.yahoo import yahoo_adapter from app.services.feature_engineering.pipeline import feature_pipeline try: df = _run_async(yahoo_adapter.get_price_dataframe(ticker, period=period)) if df.empty: return {"status": "error", "error": "No price data"} featured = feature_pipeline.compute_all_features(df) return { "status": "success", "ticker": ticker, "features_computed": len([c for c in featured.columns if c not in df.columns]), "rows": len(featured), } except Exception as e: logger.error("Feature computation failed for %s: %s", ticker, e) return {"status": "error", "error": str(e)} # Register with Celery if available try: from app.celery_app import get_celery_app app = get_celery_app() if app: ingest_prices = app.task(name="ingest_prices")(ingest_prices) ingest_news = app.task(name="ingest_news")(ingest_news) compute_features = app.task(name="compute_features")(compute_features) except Exception: pass