Spaces:
Running
Running
| """ | |
| Background Data Ingestion Tasks. | |
| Celery tasks for async data pulls when a broker is available, | |
| or synchronous fallback functions when running without Celery. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import logging | |
| from typing import Any, Dict, List | |
| logger = logging.getLogger(__name__) | |
| def _run_async(coro): | |
| """Run an async function from synchronous context.""" | |
| try: | |
| loop = asyncio.get_event_loop() | |
| if loop.is_running(): | |
| import concurrent.futures | |
| with concurrent.futures.ThreadPoolExecutor() as pool: | |
| return pool.submit(asyncio.run, coro).result() | |
| return loop.run_until_complete(coro) | |
| except RuntimeError: | |
| return asyncio.run(coro) | |
| def ingest_prices(tickers: List[str], period: str = "1y") -> Dict[str, Any]: | |
| """Ingest price data for a list of tickers.""" | |
| from app.services.data_ingestion.yahoo import yahoo_adapter | |
| results = {} | |
| for ticker in tickers: | |
| try: | |
| data = _run_async(yahoo_adapter.fetch_price_history(ticker, period=period)) | |
| results[ticker] = { | |
| "status": "success", | |
| "records": len(data.get("data", [])), | |
| } | |
| except Exception as e: | |
| results[ticker] = {"status": "error", "error": str(e)} | |
| logger.error("Price ingestion failed for %s: %s", ticker, e) | |
| return results | |
| def ingest_news(query: str, page_size: int = 20) -> Dict[str, Any]: | |
| """Ingest news articles for a query.""" | |
| from app.services.data_ingestion.news import news_adapter | |
| try: | |
| data = _run_async(news_adapter.fetch_articles(query, page_size=page_size)) | |
| return { | |
| "status": "success", | |
| "total_results": data.get("total_results", 0), | |
| "articles_fetched": len(data.get("articles", [])), | |
| } | |
| except Exception as e: | |
| logger.error("News ingestion failed for '%s': %s", query, e) | |
| return {"status": "error", "error": str(e)} | |
| def compute_features(ticker: str, period: str = "1y") -> Dict[str, Any]: | |
| """Compute technical features for a ticker.""" | |
| from app.services.data_ingestion.yahoo import yahoo_adapter | |
| from app.services.feature_engineering.pipeline import feature_pipeline | |
| try: | |
| df = _run_async(yahoo_adapter.get_price_dataframe(ticker, period=period)) | |
| if df.empty: | |
| return {"status": "error", "error": "No price data"} | |
| featured = feature_pipeline.compute_all_features(df) | |
| return { | |
| "status": "success", | |
| "ticker": ticker, | |
| "features_computed": len([c for c in featured.columns if c not in df.columns]), | |
| "rows": len(featured), | |
| } | |
| except Exception as e: | |
| logger.error("Feature computation failed for %s: %s", ticker, e) | |
| return {"status": "error", "error": str(e)} | |
| # Register with Celery if available | |
| try: | |
| from app.celery_app import get_celery_app | |
| app = get_celery_app() | |
| if app: | |
| ingest_prices = app.task(name="ingest_prices")(ingest_prices) | |
| ingest_news = app.task(name="ingest_news")(ingest_news) | |
| compute_features = app.task(name="compute_features")(compute_features) | |
| except Exception: | |
| pass | |