Asish Karthikeya Gogineni
Feat: Replace limited Alpha Vantage API with free unlimited yfinance data
06b52d2 | # alphavantage_mcp.py (Rewritten to use yfinance for unlimited free data) | |
| from fastapi import FastAPI, HTTPException | |
| import uvicorn | |
| import logging | |
| import yfinance as yf | |
| from datetime import datetime, timedelta | |
| # --- Logging Setup --- | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger("MarketData_MCP_Server") | |
| # --- FastAPI App --- | |
| app = FastAPI(title="Aegis Market Data MCP Server (yfinance)") | |
| async def get_market_data(payload: dict): | |
| """ | |
| Fetches market data using yfinance (free, no rate limits). | |
| Returns data in the exact same format expected by the orchestrator. | |
| Supports time_ranges: "INTRADAY", "1D", "3D", "1W", "1M", "3M", "1Y" | |
| """ | |
| symbol = payload.get("symbol") | |
| time_range = payload.get("time_range", "INTRADAY") | |
| if not symbol: | |
| logger.error("Validation Error: 'symbol' is required.") | |
| raise HTTPException(status_code=400, detail="'symbol' is required.") | |
| logger.info(f"Received market data request for symbol: {symbol}, time_range: {time_range}") | |
| # Map our time_range to yfinance period/interval | |
| if time_range == "INTRADAY": | |
| period = "1d" | |
| interval = "5m" | |
| elif time_range == "1D": | |
| period = "1d" | |
| interval = "1m" | |
| elif time_range == "3D": | |
| period = "5d" # yfinance doesn't have 3d, we'll fetch 5d and filter | |
| interval = "15m" | |
| elif time_range == "1W": | |
| period = "5d" # 5 trading days = 1 week | |
| interval = "15m" | |
| elif time_range == "1M": | |
| period = "1mo" | |
| interval = "1d" | |
| elif time_range == "3M": | |
| period = "3mo" | |
| interval = "1d" | |
| elif time_range == "1Y": | |
| period = "1y" | |
| interval = "1d" | |
| else: | |
| period = "1mo" | |
| interval = "1d" | |
| try: | |
| ticker = yf.Ticker(symbol) | |
| df = ticker.history(period=period, interval=interval) | |
| if df.empty: | |
| raise Exception(f"No data found for symbol {symbol}") | |
| logger.info(f"Successfully retrieved {len(df)} data points from yfinance for {symbol}") | |
| # Format dataframe into the expected nested dictionary format | |
| formatted_data = {} | |
| # If we fetched 5d for a 3D request, calculate the cutoff | |
| cutoff_date = None | |
| if time_range == "3D": | |
| cutoff_date = getattr(df.index[-1], "tz_localize", lambda x: df.index[-1])(None) - timedelta(days=3) | |
| for idx, row in df.iterrows(): | |
| # Filter for 3D request | |
| if cutoff_date: | |
| # Remove timezone for comparison to avoid offset-naive/aware errors | |
| naive_idx = getattr(idx, "tz_localize", lambda x: idx)(None) | |
| if naive_idx < cutoff_date: | |
| continue | |
| # Format timestamp based on whether it's daily or intraday | |
| if interval in ["1m", "2m", "5m", "15m", "30m", "60m", "1h"]: | |
| timestamp_str = idx.strftime("%Y-%m-%d %H:%M:%S") | |
| else: | |
| timestamp_str = idx.strftime("%Y-%m-%d") | |
| formatted_data[timestamp_str] = { | |
| "1. open": str(round(row["Open"], 2)), | |
| "2. high": str(round(row["High"], 2)), | |
| "3. low": str(round(row["Low"], 2)), | |
| "4. close": str(round(row["Close"], 2)), | |
| "5. volume": str(int(row["Volume"])) | |
| } | |
| meta_data = { | |
| "Information": f"Market Data ({time_range})", | |
| "Symbol": symbol, | |
| "Source": "Real API (yfinance)" | |
| } | |
| return {"status": "success", "data": formatted_data, "meta_data": meta_data} | |
| except Exception as e: | |
| logger.error(f"yfinance error for symbol {symbol}: {e}") | |
| raise HTTPException(status_code=500, detail=f"Failed to fetch market data: {str(e)}") | |
| def read_root(): | |
| return {"message": "Aegis Market Data MCP Server (yfinance) is operational."} | |
| # --- Main Execution --- | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="127.0.0.1", port=8002) |