quanthedge / backend /app /tasks /data_tasks.py
jashdoshi77's picture
QuantHedge: Full deployment with Docker + nginx + uvicorn
9d29748
"""
Background Data Ingestion Tasks.
Celery tasks for async data pulls when a broker is available,
or synchronous fallback functions when running without Celery.
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any, Dict, List
logger = logging.getLogger(__name__)
def _run_async(coro):
"""Run an async function from synchronous context."""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as pool:
return pool.submit(asyncio.run, coro).result()
return loop.run_until_complete(coro)
except RuntimeError:
return asyncio.run(coro)
def ingest_prices(tickers: List[str], period: str = "1y") -> Dict[str, Any]:
"""Ingest price data for a list of tickers."""
from app.services.data_ingestion.yahoo import yahoo_adapter
results = {}
for ticker in tickers:
try:
data = _run_async(yahoo_adapter.fetch_price_history(ticker, period=period))
results[ticker] = {
"status": "success",
"records": len(data.get("data", [])),
}
except Exception as e:
results[ticker] = {"status": "error", "error": str(e)}
logger.error("Price ingestion failed for %s: %s", ticker, e)
return results
def ingest_news(query: str, page_size: int = 20) -> Dict[str, Any]:
"""Ingest news articles for a query."""
from app.services.data_ingestion.news import news_adapter
try:
data = _run_async(news_adapter.fetch_articles(query, page_size=page_size))
return {
"status": "success",
"total_results": data.get("total_results", 0),
"articles_fetched": len(data.get("articles", [])),
}
except Exception as e:
logger.error("News ingestion failed for '%s': %s", query, e)
return {"status": "error", "error": str(e)}
def compute_features(ticker: str, period: str = "1y") -> Dict[str, Any]:
"""Compute technical features for a ticker."""
from app.services.data_ingestion.yahoo import yahoo_adapter
from app.services.feature_engineering.pipeline import feature_pipeline
try:
df = _run_async(yahoo_adapter.get_price_dataframe(ticker, period=period))
if df.empty:
return {"status": "error", "error": "No price data"}
featured = feature_pipeline.compute_all_features(df)
return {
"status": "success",
"ticker": ticker,
"features_computed": len([c for c in featured.columns if c not in df.columns]),
"rows": len(featured),
}
except Exception as e:
logger.error("Feature computation failed for %s: %s", ticker, e)
return {"status": "error", "error": str(e)}
# Register with Celery if available
try:
from app.celery_app import get_celery_app
app = get_celery_app()
if app:
ingest_prices = app.task(name="ingest_prices")(ingest_prices)
ingest_news = app.task(name="ingest_news")(ingest_news)
compute_features = app.task(name="compute_features")(compute_features)
except Exception:
pass