# alphavantage_mcp.py (Rewritten to use yfinance for unlimited free data) from fastapi import FastAPI, HTTPException import uvicorn import logging import yfinance as yf from datetime import datetime, timedelta # --- Logging Setup --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("MarketData_MCP_Server") # --- FastAPI App --- app = FastAPI(title="Aegis Market Data MCP Server (yfinance)") @app.post("/market_data") async def get_market_data(payload: dict): """ Fetches market data using yfinance (free, no rate limits). Returns data in the exact same format expected by the orchestrator. Supports time_ranges: "INTRADAY", "1D", "3D", "1W", "1M", "3M", "1Y" """ symbol = payload.get("symbol") time_range = payload.get("time_range", "INTRADAY") if not symbol: logger.error("Validation Error: 'symbol' is required.") raise HTTPException(status_code=400, detail="'symbol' is required.") logger.info(f"Received market data request for symbol: {symbol}, time_range: {time_range}") # Map our time_range to yfinance period/interval if time_range == "INTRADAY": period = "1d" interval = "5m" elif time_range == "1D": period = "1d" interval = "1m" elif time_range == "3D": period = "5d" # yfinance doesn't have 3d, we'll fetch 5d and filter interval = "15m" elif time_range == "1W": period = "5d" # 5 trading days = 1 week interval = "15m" elif time_range == "1M": period = "1mo" interval = "1d" elif time_range == "3M": period = "3mo" interval = "1d" elif time_range == "1Y": period = "1y" interval = "1d" else: period = "1mo" interval = "1d" try: ticker = yf.Ticker(symbol) df = ticker.history(period=period, interval=interval) if df.empty: raise Exception(f"No data found for symbol {symbol}") logger.info(f"Successfully retrieved {len(df)} data points from yfinance for {symbol}") # Format dataframe into the expected nested dictionary format formatted_data = {} # If we fetched 5d for a 3D request, calculate the cutoff cutoff_date = None if time_range == "3D": cutoff_date = getattr(df.index[-1], "tz_localize", lambda x: df.index[-1])(None) - timedelta(days=3) for idx, row in df.iterrows(): # Filter for 3D request if cutoff_date: # Remove timezone for comparison to avoid offset-naive/aware errors naive_idx = getattr(idx, "tz_localize", lambda x: idx)(None) if naive_idx < cutoff_date: continue # Format timestamp based on whether it's daily or intraday if interval in ["1m", "2m", "5m", "15m", "30m", "60m", "1h"]: timestamp_str = idx.strftime("%Y-%m-%d %H:%M:%S") else: timestamp_str = idx.strftime("%Y-%m-%d") formatted_data[timestamp_str] = { "1. open": str(round(row["Open"], 2)), "2. high": str(round(row["High"], 2)), "3. low": str(round(row["Low"], 2)), "4. close": str(round(row["Close"], 2)), "5. volume": str(int(row["Volume"])) } meta_data = { "Information": f"Market Data ({time_range})", "Symbol": symbol, "Source": "Real API (yfinance)" } return {"status": "success", "data": formatted_data, "meta_data": meta_data} except Exception as e: logger.error(f"yfinance error for symbol {symbol}: {e}") raise HTTPException(status_code=500, detail=f"Failed to fetch market data: {str(e)}") @app.get("/") def read_root(): return {"message": "Aegis Market Data MCP Server (yfinance) is operational."} # --- Main Execution --- if __name__ == "__main__": uvicorn.run(app, host="127.0.0.1", port=8002)