Spaces:
Sleeping
Sleeping
| """ | |
| Historical data module for Financial Market Data MCP Server. | |
| Handles historical price data retrieval. | |
| """ | |
| from datetime import datetime | |
| from typing import Dict | |
| from .data_fetcher import fetch_historical_data | |
| from .validators import validate_ticker, validate_period, validate_interval | |
| from .config import logger | |
| def get_historical_data(ticker: str, period: str = "1mo", interval: str = "1d") -> Dict: | |
| """ | |
| Get historical price data. | |
| Args: | |
| ticker: Stock ticker symbol | |
| period: Data period | |
| interval: Data interval | |
| Returns: | |
| Dictionary with historical OHLCV data | |
| """ | |
| # Validate all inputs | |
| is_valid_ticker, sanitized_ticker, error = validate_ticker(ticker) | |
| if not is_valid_ticker: | |
| return {"error": error, "error_code": "INVALID_TICKER"} | |
| is_valid_period, sanitized_period, error = validate_period(period) | |
| if not is_valid_period: | |
| return {"error": error, "error_code": "INVALID_PERIOD"} | |
| is_valid_interval, sanitized_interval, error = validate_interval(interval) | |
| if not is_valid_interval: | |
| return {"error": error, "error_code": "INVALID_INTERVAL"} | |
| logger.info(f"Fetching historical data: {sanitized_ticker}, {sanitized_period}, {sanitized_interval}") | |
| try: | |
| hist = fetch_historical_data(sanitized_ticker, sanitized_period, sanitized_interval) | |
| if hist.empty: | |
| return { | |
| "error": f"No historical data found for {sanitized_ticker}", | |
| "error_code": "NO_DATA" | |
| } | |
| return { | |
| "ticker": sanitized_ticker, | |
| "period": sanitized_period, | |
| "interval": sanitized_interval, | |
| "data": hist.to_dict('records'), | |
| "summary": { | |
| "start_date": str(hist.index[0]), | |
| "end_date": str(hist.index[-1]), | |
| "num_points": len(hist), | |
| "high": round(hist['High'].max(), 2), | |
| "low": round(hist['Low'].min(), 2), | |
| "avg_volume": int(hist['Volume'].mean()) | |
| }, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error fetching historical data for {sanitized_ticker}: {e}") | |
| return { | |
| "error": "Unable to fetch historical data", | |
| "error_code": "FETCH_ERROR" | |
| } | |