File size: 4,131 Bytes
06b52d2
3e30d53
 
 
06b52d2
 
3e30d53
06b52d2
3e30d53
06b52d2
3e30d53
06b52d2
 
3e30d53
 
 
 
06b52d2
 
 
3e30d53
 
 
 
 
 
 
 
 
 
06b52d2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e30d53
06b52d2
 
3e30d53
06b52d2
 
 
 
3e30d53
06b52d2
 
3e30d53
06b52d2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e30d53
06b52d2
 
 
 
 
 
 
 
3e30d53
06b52d2
 
 
 
 
3e30d53
 
06b52d2
 
 
 
 
3e30d53
 
 
06b52d2
3e30d53
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# alphavantage_mcp.py (Rewritten to use yfinance for unlimited free data)
from fastapi import FastAPI, HTTPException
import uvicorn
import logging
import yfinance as yf
from datetime import datetime, timedelta

# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("MarketData_MCP_Server")

# --- FastAPI App ---
app = FastAPI(title="Aegis Market Data MCP Server (yfinance)")

@app.post("/market_data")
async def get_market_data(payload: dict):
    """
    Fetches market data using yfinance (free, no rate limits).
    Returns data in the exact same format expected by the orchestrator.
    Supports time_ranges: "INTRADAY", "1D", "3D", "1W", "1M", "3M", "1Y"
    """
    symbol = payload.get("symbol")
    time_range = payload.get("time_range", "INTRADAY")

    if not symbol:
        logger.error("Validation Error: 'symbol' is required.")
        raise HTTPException(status_code=400, detail="'symbol' is required.")

    logger.info(f"Received market data request for symbol: {symbol}, time_range: {time_range}")

    # Map our time_range to yfinance period/interval
    if time_range == "INTRADAY":
        period = "1d"
        interval = "5m"
    elif time_range == "1D":
        period = "1d"
        interval = "1m"
    elif time_range == "3D":
        period = "5d" # yfinance doesn't have 3d, we'll fetch 5d and filter
        interval = "15m"
    elif time_range == "1W":
        period = "5d" # 5 trading days = 1 week
        interval = "15m"
    elif time_range == "1M":
        period = "1mo"
        interval = "1d"
    elif time_range == "3M":
        period = "3mo"
        interval = "1d"
    elif time_range == "1Y":
        period = "1y"
        interval = "1d"
    else:
        period = "1mo"
        interval = "1d"

    try:
        ticker = yf.Ticker(symbol)
        df = ticker.history(period=period, interval=interval)
        
        if df.empty:
            raise Exception(f"No data found for symbol {symbol}")
            
        logger.info(f"Successfully retrieved {len(df)} data points from yfinance for {symbol}")

        # Format dataframe into the expected nested dictionary format
        formatted_data = {}
        
        # If we fetched 5d for a 3D request, calculate the cutoff
        cutoff_date = None
        if time_range == "3D":
            cutoff_date = getattr(df.index[-1], "tz_localize", lambda x: df.index[-1])(None) - timedelta(days=3)

        for idx, row in df.iterrows():
            # Filter for 3D request
            if cutoff_date:
                # Remove timezone for comparison to avoid offset-naive/aware errors
                naive_idx = getattr(idx, "tz_localize", lambda x: idx)(None)
                if naive_idx < cutoff_date:
                    continue

            # Format timestamp based on whether it's daily or intraday
            if interval in ["1m", "2m", "5m", "15m", "30m", "60m", "1h"]:
                timestamp_str = idx.strftime("%Y-%m-%d %H:%M:%S")
            else:
                timestamp_str = idx.strftime("%Y-%m-%d")

            formatted_data[timestamp_str] = {
                "1. open": str(round(row["Open"], 2)),
                "2. high": str(round(row["High"], 2)),
                "3. low": str(round(row["Low"], 2)),
                "4. close": str(round(row["Close"], 2)),
                "5. volume": str(int(row["Volume"]))
            }
            
        meta_data = {
            "Information": f"Market Data ({time_range})",
            "Symbol": symbol,
            "Source": "Real API (yfinance)"
        }
        
        return {"status": "success", "data": formatted_data, "meta_data": meta_data}

    except Exception as e:
        logger.error(f"yfinance error for symbol {symbol}: {e}")
        raise HTTPException(status_code=500, detail=f"Failed to fetch market data: {str(e)}")

@app.get("/")
def read_root():
    return {"message": "Aegis Market Data MCP Server (yfinance) is operational."}

# --- Main Execution ---
if __name__ == "__main__":
    uvicorn.run(app, host="127.0.0.1", port=8002)