Spaces:
Sleeping
Sleeping
| import api_client | |
| import json | |
| from datetime import datetime, timedelta | |
| import hashlib | |
| import psycopg2 | |
| import psycopg2.extras | |
| import os | |
| from dotenv import load_dotenv | |
| from apscheduler.schedulers.asyncio import AsyncIOScheduler | |
| import requests | |
| from apscheduler.schedulers.blocking import BlockingScheduler | |
| # Load environment variables from .env file | |
| load_dotenv(override=True) | |
| def connect_to_db(): | |
| """ | |
| Connect to the PostgreSQL database using environment variables. | |
| Returns a connection object. | |
| """ | |
| db_password = os.getenv("DB_PASSWORD") | |
| if not db_password: | |
| raise ValueError( | |
| "Database password not found in environment variables. Please set DB_PASSWORD." | |
| ) | |
| # Get database connection details from environment variables with defaults | |
| db_host = os.getenv("DB_HOST") | |
| db_port = int(os.getenv("DB_PORT")) | |
| db_name = os.getenv("DB_NAME") | |
| db_user = os.getenv("DB_USER") | |
| return psycopg2.connect( | |
| host=db_host, | |
| port=db_port, | |
| database=db_name, | |
| user=db_user, | |
| password=db_password, | |
| cursor_factory=psycopg2.extras.DictCursor, | |
| ) | |
| def verify_mcp_api_key(api_key): | |
| """ | |
| Verify the MCP API key with the key generation server. | |
| Parameters: | |
| - api_key: The MCP API key to verify | |
| Returns: | |
| - Dictionary with success status and message | |
| """ | |
| try: | |
| # Get the key server URL from environment or use default | |
| key_server_url = os.getenv("KEY_SERVER_URL") | |
| response = requests.post( | |
| f"{key_server_url}/api/verifyKey", | |
| json={"apiKey": api_key}, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=10, | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if data.get("valid"): | |
| return {"success": True, "message": "API key is valid"} | |
| else: | |
| return {"success": False, "message": "API key is invalid"} | |
| else: | |
| return { | |
| "success": False, | |
| "message": f"Key verification failed with status {response.status_code}", | |
| } | |
| except requests.exceptions.RequestException as e: | |
| return { | |
| "success": False, | |
| "message": f"Failed to connect to key verification service: {str(e)}", | |
| } | |
| except Exception as e: | |
| return {"success": False, "message": f"Key verification error: {str(e)}"} | |
| def validate_api_configuration( | |
| mcp_api_key, | |
| name, | |
| description, | |
| method, | |
| base_url, | |
| endpoint, | |
| param_keys_values, | |
| header_keys_values, | |
| additional_params, | |
| schedule_interval_minutes, | |
| stop_after_hours, | |
| start_at, # IMPORTANT: Use empty string "" for immediate start (most common case) | |
| ): | |
| """ | |
| TOOL: Validate and store API configuration for monitoring. | |
| PURPOSE: Test an API endpoint and store the configuration if successful. This is STEP 1 | |
| of the monitoring setup process. If validation fails, retry with corrected parameters. If successful, use the returned config_id in activate_monitoring() function. | |
| CRITICAL: Even if success=True, you MUST manually check the 'sample_response' field | |
| before proceeding to activate_monitoring(). The API call may return success=True but contain | |
| error messages (like "401 Unauthorized", "Invalid API key", etc.) in the sample_response. | |
| CRITICAL: Always try to add parameters that will limit the API response to a manageable size. | |
| CRITICAL: Be sure to always clearly inform the user of the config_id after a desired validation result. | |
| CRITICAL: If you don't have an MCP API KEY given by the user, prompt them to get it here: https://mcp-hackathon.vercel.app/ | |
| WORKFLOW: | |
| 1. Call this function to validate API configuration | |
| 2. If success=False: Fix parameters and retry this function | |
| 3. If success=True: MANUALLY INSPECT the 'sample_response' field for errors | |
| 4. If sample_response contains error messages: Fix API parameters and retry validation | |
| 5. If sample_response looks valid: Use config_id in activate_monitoring() to activate monitoring | |
| ARGUMENTS: | |
| - mcp_api_key: MCP API key serves as user identifier. | |
| - name: User-friendly name for the monitoring task | |
| - description: Description of what is being monitored | |
| - method: HTTP method (GET, POST, PUT, DELETE) | |
| - base_url: The base URL of the API | |
| - endpoint: The specific API endpoint | |
| - param_keys_values: Parameter key-value pairs, one per line | |
| - header_keys_values: Header key-value pairs, one per line | |
| - additional_params: Optional JSON string for complex parameters | |
| - schedule_interval_minutes: Minutes between calls | |
| - stop_after_hours: Hours after which to stop (supports decimals, max 168 = 1 week) | |
| - start_at: Optional datetime string for when to start the monitoring. | |
| IMPORTANT: Leave as empty string "" for immediate start (most common use case, always default to this if no start time provided). Only provide a datetime string (e.g., "2024-06-15 09:00:00") if you need to schedule monitoring for a specific future time. | |
| Input Examples: | |
| 1. Simple GET request to monitor stock price: | |
| mcp_api_key: "your_mcp_key_here" | |
| name: "NVDA Stock Price" | |
| description: "Monitor NVIDIA stock price every 30 minutes" | |
| method: "GET" | |
| base_url: "https://api.example.com" | |
| endpoint: "stocks/NVDA" | |
| param_keys_values: "symbol: NVDA\ninterval: 1min" | |
| header_keys_values: "Authorization: Bearer your_token" | |
| additional_params: "{}" | |
| schedule_interval_minutes: 30 | |
| stop_after_hours: 1.5 | |
| start_at: "" | |
| 2. Weather monitoring with free API: | |
| mcp_api_key: "" | |
| name: "Weather Monitor" | |
| description: "Monitor current weather conditions every 2 hours for one week using Open-Meteo free API" | |
| method: "GET" | |
| base_url: "https://api.open-meteo.com" | |
| endpoint: "v1/forecast" | |
| param_keys_values: "latitude: 40.7128\nlongitude: -74.0060\ncurrent: temperature_2m,relative_humidity_2m,weather_code,wind_speed_10m\ntimezone: America/New_York" | |
| header_keys_values: "Content-Type: application/json" | |
| additional_params: "{}" | |
| schedule_interval_minutes: 120 | |
| stop_after_hours: 168 | |
| start_at: "2024-06-15 09:00:00" | |
| Returns: | |
| - Dictionary with success status, config_id (needed for setup_scheduler), message, and sample_response | |
| Example return: | |
| { | |
| "success": True, | |
| "config_id": 123, | |
| "message": "API call tested and stored successfully", | |
| "sample_response": {...}, | |
| "stop_at": "2025-06-11T12:00:00Z", | |
| "start_at": "2025-06-04T12:00:00Z" | |
| } | |
| NEXT STEP: If success=True, call activate_monitoring(config_id, mcp_api_key) to activate monitoring | |
| """ | |
| try: | |
| # Validate input parameters | |
| if not mcp_api_key or not mcp_api_key.strip() or mcp_api_key == "": | |
| mcp_api_key = os.getenv("MCP_API_KEY", "") | |
| if not mcp_api_key or not mcp_api_key.strip(): | |
| return { | |
| "success": False, | |
| "message": "MCP API key is required", | |
| "config_id": None, | |
| } | |
| # Verify the MCP API key with the key generation server | |
| key_verification = verify_mcp_api_key(mcp_api_key) | |
| if not key_verification["success"]: | |
| return { | |
| "success": False, | |
| "message": f"API key verification failed: {key_verification['message']}", | |
| "config_id": None, | |
| } | |
| # Validate required parameters | |
| if not name or not name.strip(): | |
| return { | |
| "success": False, | |
| "message": "Monitoring name is required", | |
| "config_id": None, | |
| } | |
| if not base_url or not base_url.strip(): | |
| return { | |
| "success": False, | |
| "message": "Base URL is required", | |
| "config_id": None, | |
| } | |
| if not method or method not in ["GET", "POST", "PUT", "DELETE"]: | |
| return { | |
| "success": False, | |
| "message": "Valid HTTP method is required (GET, POST, PUT, DELETE)", | |
| "config_id": None, | |
| } | |
| if ( | |
| not isinstance(schedule_interval_minutes, (int, float)) | |
| or schedule_interval_minutes <= 0 | |
| or schedule_interval_minutes > 1440 | |
| ): | |
| return { | |
| "success": False, | |
| "message": "Schedule interval must be between 0 and 1440 minutes", | |
| "config_id": None, | |
| } | |
| if ( | |
| not isinstance(stop_after_hours, (int, float)) | |
| or stop_after_hours < 0.1 | |
| or stop_after_hours > 168 | |
| ): | |
| return { | |
| "success": False, | |
| "message": "Stop after hours must be between 0.1 and 168 hours (1 week max)", | |
| "config_id": None, | |
| } | |
| # Validate start_at if provided | |
| if start_at: | |
| try: | |
| parsed_start_time = datetime.fromisoformat( | |
| start_at.replace("Z", "+00:00") | |
| ) | |
| if parsed_start_time < datetime.now(): | |
| return { | |
| "success": False, | |
| "message": "Start time cannot be in the past", | |
| "config_id": None, | |
| } | |
| except ValueError: | |
| return { | |
| "success": False, | |
| "message": "Invalid start time format", | |
| "config_id": None, | |
| } | |
| else: | |
| parsed_start_time = datetime.now() # Test the API call | |
| result = api_client.call_api( | |
| method=method, | |
| base_url=base_url, | |
| endpoint=endpoint, | |
| param_keys_values=param_keys_values, | |
| header_keys_values=header_keys_values, | |
| additional_params=additional_params, | |
| ) | |
| # Check if the API call failed | |
| if isinstance(result, str) and result.startswith("Error"): | |
| return { | |
| "success": False, | |
| "message": f"API call test failed: {result}", | |
| "config_id": None, | |
| } | |
| # Generate unique config ID and calculate timestamps | |
| config_str = ( | |
| f"{mcp_api_key}_{name}_{base_url}_{endpoint}_{datetime.now().timestamp()}" | |
| ) | |
| config_id = int(hashlib.md5(config_str.encode()).hexdigest()[:7], 16) | |
| # Calculate timestamps | |
| created_at = datetime.now() | |
| stop_at = parsed_start_time + timedelta(hours=float(stop_after_hours)) | |
| # Store configuration | |
| try: | |
| conn = connect_to_db() | |
| cur = conn.cursor() | |
| cur.execute( | |
| """ | |
| INSERT INTO api_configurations ( | |
| config_id, mcp_api_key, name, description, method, | |
| base_url, endpoint, params, headers, additional_params, | |
| is_active, schedule_interval_minutes, start_at, stop_at, created_at | |
| ) VALUES ( | |
| %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, | |
| %s, %s, %s, %s, %s | |
| ) | |
| """, | |
| ( | |
| config_id, | |
| mcp_api_key, | |
| name, | |
| description, | |
| method, | |
| base_url, | |
| endpoint, | |
| json.dumps(api_client.parse_key_value_string(param_keys_values)), | |
| json.dumps(api_client.parse_key_value_string(header_keys_values)), | |
| additional_params, | |
| False, | |
| float(schedule_interval_minutes), | |
| parsed_start_time, | |
| stop_at.isoformat(), | |
| created_at, | |
| ), | |
| ) | |
| conn.commit() | |
| cur.execute("SELECT * FROM api_configurations WHERE id = %s", (config_id,)) | |
| rows = cur.fetchall() | |
| for row in rows: | |
| print(row) | |
| conn.close() | |
| cur.close() | |
| except Exception as db_error: | |
| return { | |
| "success": False, | |
| "message": f"Database error: {str(db_error)}", | |
| "config_id": None, | |
| } | |
| # Return success response | |
| return { | |
| "success": True, | |
| "config_id": config_id, | |
| "message": f"API call tested, validated, and stored successfully for '{name}'. Make sure to review the message manually before activating monitoring. Use this config_id in activate_monitoring() to activate monitoring.", | |
| "sample_response": ( | |
| json.loads(result) | |
| if result.startswith("{") or result.startswith("[") | |
| else result | |
| ), | |
| "start_at": parsed_start_time.isoformat(), | |
| "stop_at": stop_at.isoformat(), | |
| "schedule_interval_minutes": schedule_interval_minutes, | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "message": f"Validation failed with error: {str(e)}", | |
| "config_id": None, | |
| } | |
| async def activate_monitoring(config_id, mcp_api_key): | |
| """ | |
| TOOL: Activate periodic monitoring for a validated API configuration. | |
| PURPOSE: Start automated recurring API calls based on a previously validated configuration. | |
| This is STEP 2 of the monitoring setup process. | |
| PREREQUISITE: Must call validate_api_configuration() first and obtain a config_id from successful validation. Make sure that the sample_response is what you expect | |
| to see before proceeding with this function. | |
| WORKFLOW: | |
| 1. First call validate_api_configuration() to get config_id | |
| 2. If validation successful, call this function with the config_id | |
| 3. Monitoring will run automatically according to the validated schedule | |
| ARGUMENTS: | |
| - config_id: The ID from successful validate_api_configuration() execution (required) | |
| - mcp_api_key: User's MCP API key for verification (must match validation step). | |
| Input Examples: | |
| 1. Activate scheduler for stock monitoring: | |
| config_id: 123456789 | |
| mcp_api_key: "your_mcp_key_here" | |
| 2. Activate scheduler for weather alerts: | |
| config_id: 987654321 | |
| mcp_api_key: "your_mcp_key_here" | |
| NOTE: The config_id must be obtained from a successful validate_api_configuration() response. | |
| The mcp_api_key must match the one used during validation. | |
| Returns: | |
| - Dictionary with success status and scheduling details | |
| Example return: | |
| { | |
| "success": True, | |
| "message": "Scheduler activated for 'NVDA Stock Price'", | |
| "config_id": 123, | |
| "schedule_interval_minutes": 20, | |
| "stop_at": "2025-06-11T12:00:00Z", | |
| "next_call_at": "2025-06-04T12:20:00Z" | |
| } | |
| ERROR HANDLING: If config_id not found or invalid, returns success=False with error message | |
| """ | |
| # need to extract | |
| """ | |
| mcp_api_key, | |
| name, | |
| description, | |
| method, | |
| base_url, | |
| endpoint, | |
| param_keys_values, | |
| header_keys_values, | |
| additional_params, | |
| schedule_interval_minutes, | |
| stop_after_hours, | |
| time_to_start, | |
| this | |
| """ | |
| # Attempt to create the scheduler | |
| try: | |
| if not mcp_api_key or not mcp_api_key.strip() or mcp_api_key == "": | |
| mcp_api_key = os.getenv("MCP_API_KEY", "") | |
| if not mcp_api_key or not mcp_api_key.strip(): | |
| return { | |
| "success": False, | |
| "message": "MCP API key is required", | |
| "config_id": None, | |
| } | |
| # Verify the MCP API key with the key generation server first | |
| key_verification = verify_mcp_api_key(mcp_api_key) | |
| if not key_verification["success"]: | |
| return { | |
| "success": False, | |
| "message": f"API key verification failed: {key_verification['message']}", | |
| "config_id": config_id, | |
| } | |
| conn = connect_to_db() | |
| cur = conn.cursor() | |
| cur.execute( | |
| "SELECT * FROM api_configurations WHERE config_id = %s", (config_id,) | |
| ) | |
| config_row = cur.fetchone() | |
| if not config_row: | |
| conn.close() | |
| return { | |
| "success": False, | |
| "message": "Invalid config_id", | |
| "config_id": config_id, | |
| } | |
| config = dict(config_row) | |
| if config["mcp_api_key"] != mcp_api_key: | |
| conn.close() | |
| return { | |
| "success": False, | |
| "message": "Invalid mcp_api_key. You are not authorized to activate this configuration.", | |
| "config_id": config_id, | |
| } | |
| # Extract scheduling parameters | |
| name = config.get("name", "Unknown") | |
| schedule_interval_minutes = float(config.get("schedule_interval_minutes", 20)) | |
| stop_at = config.get("stop_at") | |
| start_at = config.get("start_at") | |
| if not start_at: | |
| start_at = datetime.now() | |
| else: | |
| if not isinstance(start_at, datetime): | |
| start_at = datetime.fromisoformat(str(start_at)) | |
| if not stop_at: | |
| stop_at = start_at + timedelta(hours=config.get("stop_after_hours", 24)) | |
| else: | |
| if not isinstance(stop_at, datetime): | |
| stop_at = datetime.fromisoformat( | |
| str(stop_at) | |
| ) | |
| # Job function to make actual API calls | |
| def api_monitoring_job(): | |
| now = datetime.now() | |
| next_call = now + timedelta(minutes=schedule_interval_minutes) | |
| print( | |
| f"Executing API monitoring job for {name} at {now.isoformat()}. Next call at {next_call.isoformat()}" | |
| ) | |
| try: | |
| # Extract API configuration parameters | |
| method = config.get("method", "GET") | |
| base_url = config.get("base_url") | |
| endpoint = config.get("endpoint", "") | |
| params = config.get("params", {}) | |
| headers = config.get("headers", {}) | |
| additional_params = config.get("additional_params", {}) | |
| # Convert JSON strings back to dicts if needed | |
| if isinstance(params, str): | |
| params = json.loads(params) if params else {} | |
| if isinstance(headers, str): | |
| headers = json.loads(headers) if headers else {} | |
| if isinstance(additional_params, str): | |
| additional_params = ( | |
| json.loads(additional_params) if additional_params else {} | |
| ) | |
| # Convert params and headers back to key-value string format for api_client | |
| param_keys_values = ( | |
| "\n".join([f"{k}: {v}" for k, v in params.items()]) | |
| if params | |
| else "" | |
| ) | |
| header_keys_values = ( | |
| "\n".join([f"{k}: {v}" for k, v in headers.items()]) | |
| if headers | |
| else "" | |
| ) | |
| additional_params_str = ( | |
| json.dumps(additional_params) if additional_params else "{}" | |
| ) | |
| # Make the actual API call | |
| api_result = api_client.call_api( | |
| method=method, | |
| base_url=base_url, | |
| endpoint=endpoint, | |
| param_keys_values=param_keys_values, | |
| header_keys_values=header_keys_values, | |
| additional_params=additional_params_str, | |
| ) | |
| # Determine if the call was successful | |
| is_successful = not ( | |
| isinstance(api_result, str) and api_result.startswith("Error") | |
| ) | |
| error_message = api_result if not is_successful else None | |
| response_data = api_result if is_successful else None | |
| # Convert response to JSON if it's a string representation | |
| if is_successful and isinstance(response_data, str): | |
| try: | |
| if response_data.startswith("{") or response_data.startswith( | |
| "[" | |
| ): | |
| response_data = json.loads(response_data) | |
| except json.JSONDecodeError: | |
| # Keep as string if not valid JSON | |
| pass | |
| job_conn = connect_to_db() | |
| job_cur = job_conn.cursor() | |
| # Mark config as active (only once, on first run) | |
| if not config["is_active"]: | |
| job_cur.execute( | |
| """ | |
| UPDATE api_configurations SET is_active = %s WHERE config_id = %s | |
| """, | |
| (True, config_id), | |
| ) | |
| job_conn.commit() | |
| print(f"Marked configuration {config_id} as active.") | |
| # Check if this is the last call by comparing current time to stop_at | |
| current_time = datetime.now() | |
| next_call_time = current_time + timedelta( | |
| minutes=schedule_interval_minutes | |
| ) | |
| if next_call_time >= stop_at: | |
| # This is the last call, mark as inactive | |
| job_cur.execute( | |
| """ | |
| UPDATE api_configurations SET is_active = %s WHERE config_id = %s | |
| """, | |
| (False, config_id), | |
| ) | |
| job_conn.commit() | |
| print( | |
| f"Last call for configuration {config_id}. Marked as inactive." | |
| ) | |
| # Insert the actual API call result | |
| job_cur.execute( | |
| """ | |
| INSERT INTO api_call_results ( | |
| config_id, response_data, is_successful, error_message, called_at | |
| ) VALUES (%s, %s, %s, %s, %s) | |
| """, | |
| ( | |
| config_id, | |
| ( | |
| json.dumps(response_data) | |
| if response_data is not None | |
| else None | |
| ), | |
| is_successful, | |
| error_message, | |
| now, | |
| ), | |
| ) | |
| job_conn.commit() | |
| job_cur.close() | |
| job_conn.close() | |
| print( | |
| f"API call result for {name}: {'Success' if is_successful else 'Failed'}" | |
| ) | |
| if not is_successful: | |
| print(f"Error: {error_message}") | |
| except Exception as job_exc: | |
| print(f"API monitoring job error for {name}: {job_exc}") | |
| try: | |
| job_conn = connect_to_db() | |
| job_cur = job_conn.cursor() | |
| job_cur.execute( | |
| """ | |
| INSERT INTO api_call_results ( | |
| config_id, response_data, is_successful, error_message, called_at | |
| ) VALUES (%s, %s, %s, %s, %s) | |
| """, | |
| ( | |
| config_id, | |
| None, | |
| False, | |
| f"Job execution error: {str(job_exc)}", | |
| now, | |
| ), | |
| ) | |
| job_conn.commit() | |
| job_cur.close() | |
| job_conn.close() | |
| except Exception as db_exc: | |
| print( | |
| f"Failed to log error to database: {db_exc}" | |
| ) | |
| # Setup AsyncIO scheduler | |
| scheduler = AsyncIOScheduler() | |
| # Schedule the API monitoring job | |
| scheduler.add_job( | |
| api_monitoring_job, | |
| "interval", | |
| minutes=schedule_interval_minutes, | |
| start_date=start_at, | |
| end_date=stop_at, | |
| id=f"monitor_{config_id}", | |
| ) | |
| scheduler.start() | |
| # Mark config as active (only once, on first run) | |
| if not config["is_active"]: | |
| cur.execute( | |
| """ | |
| UPDATE api_configurations SET is_active = %s WHERE config_id = %s | |
| """, | |
| (True, config_id), | |
| ) | |
| conn.commit() | |
| print(f"Marked configuration {config_id} as active.") | |
| conn.close() | |
| return { | |
| "success": True, | |
| "message": f"Scheduler activated for '{name}'", | |
| "config_id": config_id, | |
| "schedule_interval_minutes": schedule_interval_minutes, | |
| "stop_at": stop_at.isoformat(), | |
| "next_call_at": ( | |
| start_at + timedelta(minutes=schedule_interval_minutes) | |
| ).isoformat(), | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "message": f"Failed to create scheduler: {str(e)}", | |
| "config_id": config_id, | |
| } | |
| def retrieve_monitored_data(config_id, mcp_api_key, mode="summary"): | |
| """ | |
| TOOL: Retrieve monitored data for a specific API configuration. | |
| PURPOSE: Fetch the latest monitored data for a given configuration ID. | |
| This is STEP 3 of the monitoring setup process. | |
| PREREQUISITE: Must call validate_api_configuration() first and obtain a config_id from successful validation, then activate_monitoring() to start monitoring. | |
| This function can be called at any time after monitoring activation to retrieve the latest data collected by the monitoring system. | |
| ARGUMENTS: | |
| - config_id: The ID of the API configuration to retrieve data for (required) | |
| - mcp_api_key: User's MCP API key for verification (must match validation step). | |
| - mode: Data return mode - "summary" (LLM-optimized), "details" (full responses, minimal metadata), "full" (everything) | |
| Input Examples: | |
| 1. Retrieve data for stock monitoring: | |
| config_id: 123456789 | |
| mcp_api_key: "your_mcp_key_here" | |
| 2. Retrieve data for weather alerts: | |
| config_id: 987654321 | |
| mcp_api_key: "your_mcp_key_here" | |
| Returns: | |
| - Dictionary with monitoring status in one of three formats based on mode parameter | |
| SUMMARY mode (LLM-optimized, default): | |
| { | |
| "success": True, | |
| "config_name": "Weather Alert Monitor", | |
| "summary": { | |
| "status": "active", // "active", "inactive" | |
| "health": "good", // "good", "degraded", "no_data" | |
| "calls_made": 15, | |
| "success_rate": 93.3, | |
| "last_call": "2025-06-05T15:20:00", | |
| "last_success": "2025-06-05T15:20:00" | |
| }, | |
| "recent_calls": [ | |
| { | |
| "timestamp": "2025-06-05T15:20:00", | |
| "success": true, | |
| "error": null, | |
| "response_preview": "{'alerts': [{'type': 'tornado'}]}..." // truncated to 150 characters | |
| } | |
| // ... up to 5 most recent calls | |
| ], | |
| "full_data_available": 15, | |
| "monitoring_details": { | |
| "interval_minutes": 20, | |
| "is_finished": false | |
| } | |
| } | |
| DETAILS mode (full responses, minimal metadata): | |
| { | |
| "success": True, | |
| "config_name": "Weather Alert Monitor", | |
| "status": "active", | |
| "calls_made": 15, | |
| "success_rate": 93.3, | |
| "recent_responses": [ | |
| { | |
| "timestamp": "2025-06-05T15:20:00", | |
| "success": true, | |
| "response_data": {...}, // full response data | |
| "error": null | |
| } | |
| // ... up to 10 most recent calls with full responses | |
| ] | |
| } | |
| FULL mode (everything): | |
| { | |
| "success": True, | |
| "config_name": "Weather Alert Monitor", | |
| "config_description": "Monitor severe weather alerts", | |
| "is_active": True, | |
| "is_finished": False, | |
| "progress": {...}, | |
| "schedule_info": {...}, | |
| "data": [...] // all historical data | |
| } | |
| Error return format: | |
| { | |
| "success": False, | |
| "message": "Invalid config_id or mcp_api_key" | |
| } | |
| ERROR HANDLING: If config_id not found or invalid, returns success=False with error message | |
| """ | |
| try: | |
| if not mcp_api_key or not mcp_api_key.strip() or mcp_api_key == "": | |
| mcp_api_key = os.getenv("MCP_API_KEY", "") | |
| if not mcp_api_key or not mcp_api_key.strip(): | |
| return { | |
| "success": False, | |
| "message": "MCP API key is required", | |
| "config_id": None, | |
| } | |
| # Verify the MCP API key with the key generation server first | |
| key_verification = verify_mcp_api_key(mcp_api_key) | |
| if not key_verification["success"]: | |
| return { | |
| "success": False, | |
| "message": f"API key verification failed: {key_verification['message']}", | |
| "data": [], | |
| } | |
| conn = connect_to_db() | |
| cur = conn.cursor() | |
| cur.execute( | |
| "SELECT * FROM api_configurations WHERE config_id = %s", (config_id,) | |
| ) | |
| config_row = cur.fetchone() | |
| if not config_row: | |
| conn.close() | |
| return { | |
| "success": False, | |
| "message": "Invalid config_id", | |
| "data": [], | |
| } | |
| config = dict(config_row) | |
| print(f"Retrieved config: {config}") | |
| if config["mcp_api_key"] != mcp_api_key: | |
| conn.close() | |
| return { | |
| "success": False, | |
| "message": "Invalid mcp_api_key. You are not authorized to access this configuration.", | |
| "data": [], | |
| } | |
| # Query the api_call_results table for monitored data | |
| cur.execute( | |
| "SELECT * FROM api_call_results WHERE config_id = %s ORDER BY called_at DESC", | |
| (config_id,), | |
| ) | |
| monitored_data_rows = cur.fetchall() | |
| # Convert rows to dictionaries and format timestamps | |
| monitored_data = [] | |
| for row in monitored_data_rows: | |
| row_dict = dict(row) | |
| # Format the timestamp for better readability | |
| if row_dict.get("called_at"): | |
| row_dict["called_at"] = row_dict["called_at"].isoformat() | |
| monitored_data.append(row_dict) | |
| # Check if monitoring is finished | |
| now = datetime.now() | |
| stop_at_time = config.get("stop_at") | |
| if stop_at_time: | |
| if hasattr(stop_at_time, "replace"): | |
| stop_at = stop_at_time | |
| else: | |
| stop_at = datetime.fromisoformat( | |
| str(stop_at_time).replace("Z", "+00:00") | |
| ) | |
| is_finished = now > stop_at | |
| else: | |
| is_finished = False | |
| # Calculate progress statistics | |
| total_expected_calls = 0 | |
| if config.get("start_at") and config.get("schedule_interval_minutes"): | |
| start_time = config["start_at"] | |
| if hasattr(start_time, "replace"): | |
| start_dt = start_time | |
| else: | |
| start_dt = datetime.fromisoformat(str(start_time)) | |
| elapsed_minutes = (now - start_dt).total_seconds() / 60 | |
| if elapsed_minutes > 0: | |
| total_expected_calls = max( | |
| 1, int(elapsed_minutes / float(config["schedule_interval_minutes"])) | |
| ) | |
| # Get success/failure counts | |
| successful_calls = len( | |
| [d for d in monitored_data if d.get("is_successful", False)] | |
| ) | |
| failed_calls = len( | |
| [d for d in monitored_data if not d.get("is_successful", True)] | |
| ) | |
| total_calls = len( | |
| monitored_data | |
| ) # Create simplified summary for LLM consumption | |
| summary = { | |
| "status": ( | |
| "active" | |
| if config.get("is_active", False) and not is_finished | |
| else "inactive" | |
| ), | |
| "health": ( | |
| "good" | |
| if total_calls > 0 and (successful_calls / total_calls) > 0.8 | |
| else "degraded" if total_calls > 0 else "no_data" | |
| ), | |
| "calls_made": total_calls, | |
| "success_rate": ( | |
| round(successful_calls / total_calls * 100, 1) if total_calls > 0 else 0 | |
| ), | |
| "last_call": monitored_data[0]["called_at"] if monitored_data else None, | |
| "last_success": next( | |
| (d["called_at"] for d in monitored_data if d.get("is_successful")), None | |
| ), | |
| } | |
| # Handle different return modes | |
| if mode == "full": | |
| # Return complete detailed data (original detailed format) | |
| return { | |
| "success": True, | |
| "message": f"Full data retrieved for config_id {config_id}", | |
| "config_name": config.get("name", "Unknown"), | |
| "config_description": config.get("description", ""), | |
| "is_active": config.get("is_active", False), | |
| "is_finished": is_finished, | |
| "progress": { | |
| "total_calls": total_calls, | |
| "successful_calls": successful_calls, | |
| "failed_calls": failed_calls, | |
| "expected_calls": total_expected_calls, | |
| "success_rate": ( | |
| round(successful_calls / total_calls * 100, 2) | |
| if total_calls > 0 | |
| else 0 | |
| ), | |
| }, | |
| "schedule_info": { | |
| "interval_minutes": config.get("schedule_interval_minutes"), | |
| "start_at": ( | |
| config.get("start_at").isoformat() | |
| if config.get("start_at") | |
| else None | |
| ), | |
| "stop_at": ( | |
| config.get("stop_at").isoformat() | |
| if config.get("stop_at") | |
| else None | |
| ), | |
| }, | |
| "data": monitored_data, | |
| } | |
| elif mode == "details": | |
| # Return full response data but minimal metadata (up to 10 recent calls) | |
| recent_responses = [] | |
| for item in monitored_data[:10]: # Last 10 calls with full responses | |
| recent_responses.append( | |
| { | |
| "timestamp": item["called_at"], | |
| "success": item.get("is_successful", False), | |
| "response_data": item.get( | |
| "response_data" | |
| ), | |
| "error": ( | |
| item.get("error_message") | |
| if not item.get("is_successful") | |
| else None | |
| ), | |
| } | |
| ) | |
| return { | |
| "success": True, | |
| "config_name": config.get("name", "Unknown"), | |
| "status": summary["status"], | |
| "calls_made": total_calls, | |
| "success_rate": summary["success_rate"], | |
| "recent_responses": recent_responses, | |
| } | |
| else: # mode == "summary" (default) | |
| # Get recent data (last 5 calls) with essential info only | |
| recent_data = [] | |
| for item in monitored_data[:5]: # Only last 5 calls | |
| recent_data.append( | |
| { | |
| "timestamp": item["called_at"], | |
| "success": item.get("is_successful", False), | |
| "error": ( | |
| item.get("error_message") | |
| if not item.get("is_successful") | |
| else None | |
| ), | |
| "response_preview": ( | |
| str(item.get("response_data", ""))[:150] + "..." | |
| if item.get("response_data") | |
| else None | |
| ), | |
| } | |
| ) | |
| return { | |
| "success": True, | |
| "config_name": config.get("name", "Unknown"), | |
| "summary": summary, | |
| "recent_calls": recent_data, | |
| "full_data_available": len(monitored_data), | |
| "monitoring_details": { | |
| "interval_minutes": config.get("schedule_interval_minutes"), | |
| "is_finished": is_finished, | |
| }, | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "message": f"Database connection failed: {str(e)}", | |
| "data": [], | |
| } | |