finance-data-mcp / src /validators.py
dlrklc's picture
Initial commit: Gradio MCP app for real-time financial data
7169bc5
"""
Input validation module for Financial Market Data MCP Server.
Handles validation and sanitization of user inputs.
"""
import json
import re
import logging
from typing import Tuple, Dict, Callable
from .config import (
ALLOWED_TICKER_PATTERN,
MAX_TICKERS_PER_REQUEST,
ALLOWED_PERIODS,
ALLOWED_INTERVALS,
ALLOWED_METRICS
)
from .data_fetcher import get_cached_stock_data
logger = logging.getLogger(__name__)
def _ticker_has_market_data(ticker: str, fetcher: Callable) -> bool:
"""Return True if the fetcher can return market data for ticker."""
try:
_, hist = fetcher(ticker)
return not hist.empty
except Exception as exc:
logger.warning(f"Ticker existence check failed for {ticker}: {exc}")
return False
def validate_ticker(
ticker: str,
*,
check_exists: bool = False,
fetcher: Callable = None
) -> Tuple[bool, str, str]:
"""
Validate and sanitize ticker symbol.
Args:
ticker: Raw ticker input
Returns:
Tuple of (is_valid, sanitized_ticker, error_message)
check_exists: When True, ensure the ticker returns real market data
fetcher: Optional callable returning (info, history). Defaults to get_cached_stock_data
Returns:
Tuple of (is_valid, sanitized_ticker, error_message)
"""
if not ticker:
return False, "", "Ticker symbol is required"
# Remove whitespace and convert to uppercase
sanitized = ticker.strip().upper()
# Check length
if len(sanitized) > 5:
return False, "", "Ticker symbol too long (max 5 characters)"
# Check format (only uppercase letters)
if not re.match(ALLOWED_TICKER_PATTERN, sanitized):
logger.warning(f"Invalid ticker format attempted: {ticker}")
return False, "", "Invalid ticker format. Use 1-5 uppercase letters only."
# Check for common injection patterns
dangerous_patterns = [';', '--', '/*', '*/', 'DROP', 'DELETE', 'INSERT']
if any(pattern in sanitized for pattern in dangerous_patterns):
logger.error(f"Potential SQL injection attempt: {ticker}")
return False, "", "Invalid characters in ticker symbol"
if check_exists:
fetch_fn = fetcher or get_cached_stock_data
if not _ticker_has_market_data(sanitized, fetch_fn):
return False, "", f"Ticker '{sanitized}' not found or has no market data"
return True, sanitized, ""
def validate_period(period: str) -> Tuple[bool, str, str]:
"""
Validate period parameter.
Args:
period: Period string
Returns:
Tuple of (is_valid, sanitized_period, error_message)
"""
if period not in ALLOWED_PERIODS:
return False, "", f"Invalid period. Allowed: {', '.join(ALLOWED_PERIODS)}"
return True, period, ""
def validate_interval(interval: str) -> Tuple[bool, str, str]:
"""
Validate interval parameter.
Args:
interval: Interval string
Returns:
Tuple of (is_valid, sanitized_interval, error_message)
"""
if interval not in ALLOWED_INTERVALS:
return False, "", f"Invalid interval. Allowed: {', '.join(ALLOWED_INTERVALS)}"
return True, interval, ""
def validate_metric(metric: str) -> Tuple[bool, str, str]:
"""
Validate comparison metric parameter.
Args:
metric: Metric string
Returns:
Tuple of (is_valid, sanitized_metric, error_message)
"""
if metric not in ALLOWED_METRICS:
return False, "", f"Invalid metric. Allowed: {', '.join(ALLOWED_METRICS)}"
return True, metric, ""
def validate_json_input(json_str: str) -> Tuple[bool, Dict, str]:
"""
Validate and sanitize JSON input.
Args:
json_str: JSON string
Returns:
Tuple of (is_valid, parsed_json, error_message)
"""
try:
data = json.loads(json_str)
# Check if it's a dictionary
if not isinstance(data, dict):
return False, {}, "JSON must be an object/dictionary"
# Limit number of items
if len(data) > MAX_TICKERS_PER_REQUEST:
return False, {}, f"Too many tickers (max {MAX_TICKERS_PER_REQUEST})"
# Validate each ticker in the portfolio
for ticker in data.keys():
is_valid, _, error = validate_ticker(ticker)
if not is_valid:
return False, {}, f"Invalid ticker '{ticker}': {error}"
return True, data, ""
except json.JSONDecodeError as e:
logger.warning(f"Invalid JSON input: {e}")
return False, {}, "Invalid JSON format"
except Exception as e:
logger.error(f"Unexpected error validating JSON: {e}")
return False, {}, "Invalid JSON format"