File size: 3,248 Bytes
9d29748
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
"""
Background Data Ingestion Tasks.

Celery tasks for async data pulls when a broker is available,
or synchronous fallback functions when running without Celery.
"""

from __future__ import annotations

import asyncio
import logging
from typing import Any, Dict, List

logger = logging.getLogger(__name__)


def _run_async(coro):
    """Run an async function from synchronous context."""
    try:
        loop = asyncio.get_event_loop()
        if loop.is_running():
            import concurrent.futures
            with concurrent.futures.ThreadPoolExecutor() as pool:
                return pool.submit(asyncio.run, coro).result()
        return loop.run_until_complete(coro)
    except RuntimeError:
        return asyncio.run(coro)


def ingest_prices(tickers: List[str], period: str = "1y") -> Dict[str, Any]:
    """Ingest price data for a list of tickers."""
    from app.services.data_ingestion.yahoo import yahoo_adapter

    results = {}
    for ticker in tickers:
        try:
            data = _run_async(yahoo_adapter.fetch_price_history(ticker, period=period))
            results[ticker] = {
                "status": "success",
                "records": len(data.get("data", [])),
            }
        except Exception as e:
            results[ticker] = {"status": "error", "error": str(e)}
            logger.error("Price ingestion failed for %s: %s", ticker, e)

    return results


def ingest_news(query: str, page_size: int = 20) -> Dict[str, Any]:
    """Ingest news articles for a query."""
    from app.services.data_ingestion.news import news_adapter

    try:
        data = _run_async(news_adapter.fetch_articles(query, page_size=page_size))
        return {
            "status": "success",
            "total_results": data.get("total_results", 0),
            "articles_fetched": len(data.get("articles", [])),
        }
    except Exception as e:
        logger.error("News ingestion failed for '%s': %s", query, e)
        return {"status": "error", "error": str(e)}


def compute_features(ticker: str, period: str = "1y") -> Dict[str, Any]:
    """Compute technical features for a ticker."""
    from app.services.data_ingestion.yahoo import yahoo_adapter
    from app.services.feature_engineering.pipeline import feature_pipeline

    try:
        df = _run_async(yahoo_adapter.get_price_dataframe(ticker, period=period))
        if df.empty:
            return {"status": "error", "error": "No price data"}

        featured = feature_pipeline.compute_all_features(df)
        return {
            "status": "success",
            "ticker": ticker,
            "features_computed": len([c for c in featured.columns if c not in df.columns]),
            "rows": len(featured),
        }
    except Exception as e:
        logger.error("Feature computation failed for %s: %s", ticker, e)
        return {"status": "error", "error": str(e)}


# Register with Celery if available
try:
    from app.celery_app import get_celery_app

    app = get_celery_app()
    if app:
        ingest_prices = app.task(name="ingest_prices")(ingest_prices)
        ingest_news = app.task(name="ingest_news")(ingest_news)
        compute_features = app.task(name="compute_features")(compute_features)
except Exception:
    pass