Spaces:
Running
Running
File size: 3,248 Bytes
9d29748 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 | """
Background Data Ingestion Tasks.
Celery tasks for async data pulls when a broker is available,
or synchronous fallback functions when running without Celery.
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any, Dict, List
logger = logging.getLogger(__name__)
def _run_async(coro):
"""Run an async function from synchronous context."""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as pool:
return pool.submit(asyncio.run, coro).result()
return loop.run_until_complete(coro)
except RuntimeError:
return asyncio.run(coro)
def ingest_prices(tickers: List[str], period: str = "1y") -> Dict[str, Any]:
"""Ingest price data for a list of tickers."""
from app.services.data_ingestion.yahoo import yahoo_adapter
results = {}
for ticker in tickers:
try:
data = _run_async(yahoo_adapter.fetch_price_history(ticker, period=period))
results[ticker] = {
"status": "success",
"records": len(data.get("data", [])),
}
except Exception as e:
results[ticker] = {"status": "error", "error": str(e)}
logger.error("Price ingestion failed for %s: %s", ticker, e)
return results
def ingest_news(query: str, page_size: int = 20) -> Dict[str, Any]:
"""Ingest news articles for a query."""
from app.services.data_ingestion.news import news_adapter
try:
data = _run_async(news_adapter.fetch_articles(query, page_size=page_size))
return {
"status": "success",
"total_results": data.get("total_results", 0),
"articles_fetched": len(data.get("articles", [])),
}
except Exception as e:
logger.error("News ingestion failed for '%s': %s", query, e)
return {"status": "error", "error": str(e)}
def compute_features(ticker: str, period: str = "1y") -> Dict[str, Any]:
"""Compute technical features for a ticker."""
from app.services.data_ingestion.yahoo import yahoo_adapter
from app.services.feature_engineering.pipeline import feature_pipeline
try:
df = _run_async(yahoo_adapter.get_price_dataframe(ticker, period=period))
if df.empty:
return {"status": "error", "error": "No price data"}
featured = feature_pipeline.compute_all_features(df)
return {
"status": "success",
"ticker": ticker,
"features_computed": len([c for c in featured.columns if c not in df.columns]),
"rows": len(featured),
}
except Exception as e:
logger.error("Feature computation failed for %s: %s", ticker, e)
return {"status": "error", "error": str(e)}
# Register with Celery if available
try:
from app.celery_app import get_celery_app
app = get_celery_app()
if app:
ingest_prices = app.task(name="ingest_prices")(ingest_prices)
ingest_news = app.task(name="ingest_news")(ingest_news)
compute_features = app.task(name="compute_features")(compute_features)
except Exception:
pass
|