Spaces:
Sleeping
Sleeping
| # app.py (or main.py) | |
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| import numpy as np | |
| import tensorflow as tf | |
| from tensorflow.keras.models import load_model | |
| from tensorflow.keras.layers import Input | |
| from tensorflow.keras.utils import custom_object_scope | |
| import pickle | |
| import os | |
| import requests | |
| import pandas as pd | |
| from datetime import datetime, timedelta, timezone | |
| import pytz | |
| import json | |
| import traceback # Import traceback to print detailed error info | |
| import os | |
| os.environ["CUDA_VISIBLE_DEVICES"] = "-1" | |
| # Assuming TKAN is installed and available | |
| from tkan import TKAN | |
| try: | |
| from tkat import TKAT | |
| except ImportError: | |
| print("TKAT library not found. If your model uses TKAT, ensure the library is installed.") | |
| TKAT = None | |
| # --- Your MinMaxScaler Class (Copied from Notebook) --- | |
| # This class is essential for loading your scalers | |
| class MinMaxScaler: | |
| def __init__(self, feature_axis=None, minmax_range=(0, 1)): | |
| self.feature_axis = feature_axis | |
| self.min_ = None | |
| self.max_ = None | |
| self.scale_ = None | |
| self.minmax_range = minmax_range | |
| def fit(self, X): | |
| if X.ndim == 3 and self.feature_axis is not None: | |
| axis = tuple(i for i in range(X.ndim) if i != self.feature_axis) | |
| self.min_ = np.min(X, axis=axis) | |
| self.max_ = np.max(X, axis=axis) | |
| elif X.ndim == 2: | |
| self.min_ = np.min(X, axis=0) | |
| self.max_ = np.max(X, axis=0) | |
| elif X.ndim == 1: | |
| self.min_ = np.min(X) | |
| self.max_ = np.max(X) | |
| else: | |
| raise ValueError("Data must be 1D, 2D, or 3D.") | |
| self.scale_ = self.max_ - self.min_ | |
| return self | |
| def transform(self, X): | |
| if self.min_ is None or self.max_ is None or self.scale_ is None: | |
| raise ValueError("Scaler has not been fitted.") | |
| X_scaled = (X - self.min_) / self.scale_ | |
| X_scaled = X_scaled * (self.minmax_range[1] - self.minmax_range[0]) + self.minmax_range[0] | |
| return X_scaled | |
| def inverse_transform(self, X_scaled): | |
| if self.min_ is None or self.max_ is None or self.scale_ is None: | |
| raise ValueError("Scaler has not been fitted.") | |
| X = (X_scaled - self.minmax_range[0]) / (self.minmax_range[1] - self.minmax_range[0]) | |
| X = X * self.scale_ + self.min_ | |
| return X | |
| # --- AQI breakpoints and calculation functions (Copied from Notebook) --- | |
| aqi_breakpoints = { | |
| 'pm25': [(0, 50, 0, 50), (51, 100, 51, 100), (101, 200, 101, 200), (201, 300, 201, 300)], | |
| 'pm10': [(0, 50, 0, 50), (51, 100, 51, 100), (101, 250, 101, 200), (251, 350, 201, 300)], | |
| 'co': [(0, 1.0, 0, 50), (1.1, 2.0, 51, 100), (2.1, 10.0, 101, 200), (10.1, 17.0, 201, 300)] | |
| } | |
| def calculate_sub_aqi(concentration, breakpoints): | |
| for i_low, i_high, c_low, c_high in breakpoints: | |
| if c_low <= concentration <= c_high: | |
| if c_high == c_low: | |
| return i_low | |
| return ((i_high - i_low) / (c_high - c_low)) * (concentration - c_low) + i_low | |
| if concentration < breakpoints[0][2]: | |
| return breakpoints[0][0] | |
| elif concentration > breakpoints[-1][3]: | |
| return breakpoints[-1][1] | |
| else: | |
| return np.nan | |
| def calculate_overall_aqi(row, aqi_breakpoints): | |
| sub_aqis = [] | |
| # Mapping API names to internal names if necessary | |
| pollutant_mapping = { | |
| 'pm25': 'pm25', | |
| 'pm10': 'pm10', | |
| 'co': 'co', | |
| 'pm2_5': 'pm25', # Common API name for PM2.5 | |
| 'carbon_monoxide': 'co', # Common API name for CO | |
| } | |
| for api_pollutant, internal_pollutant in pollutant_mapping.items(): | |
| if api_pollutant in row: | |
| concentration = row[api_pollutant] | |
| if not pd.isna(concentration): # Use pd.isna for pandas DataFrames/Series | |
| sub_aqi = calculate_sub_aqi(concentration, aqi_breakpoints.get(internal_pollutant, [])) | |
| sub_aqis.append(sub_aqi) | |
| else: | |
| sub_aqis.append(np.nan) | |
| else: | |
| sub_aqis.append(np.nan) | |
| # Use np.nanmax to find the maximum ignoring NaNs. Returns -inf if all are NaN. | |
| # Check if sub_aqis list is not empty and contains at least one non-NaN value | |
| if sub_aqis and not all(pd.isna(sub_aqis)): | |
| return np.nanmax(sub_aqis) | |
| else: | |
| return np.nan # Return NaN if no valid pollutant data is available | |
| # --- Data Retrieval Function --- | |
| def get_latest_data_sequence(sequence_length: int, latitude: float, longitude: float): | |
| print(f"Attempting to retrieve data for the last {sequence_length} hours from Open-Meteo for Lat: {latitude}, Lon: {longitude}") | |
| end_time = datetime.now(pytz.utc) | |
| # Fetch slightly more data to allow for resampling and ensure sequence_length is met | |
| fetch_hours = sequence_length + 5 | |
| start_time = end_time - timedelta(hours=fetch_hours) | |
| # Format timestamps for API request (ISO 8601) | |
| start_time_str = start_time.isoformat().split('.')[0] + 'Z' | |
| end_time_str = end_time.isoformat().split('.')[0] + 'Z' | |
| print(f"Requesting data from {start_time_str} to {end_time_str}") | |
| # Open-Meteo Air Quality API | |
| air_quality_url = "https://air-quality-api.open-meteo.com/v1/air-quality" | |
| air_quality_params = { | |
| "latitude": latitude, | |
| "longitude": longitude, | |
| "hourly": ["pm2_5", "pm10", "carbon_monoxide"], | |
| "timezone": "UTC", | |
| "start_date": start_time.strftime('%Y-%m-%d'), # Use YYYY-MM-DD format | |
| "end_date": end_time.strftime('%Y-%m-%d'), | |
| "past_hours": fetch_hours | |
| } | |
| # Open-Meteo Historical Weather API for Temperature | |
| weather_url = "https://archive-api.open-meteo.com/v1/archive" | |
| weather_params = { | |
| "latitude": latitude, | |
| "longitude": longitude, | |
| "hourly": ["temperature_2m"], | |
| "timezone": "UTC", | |
| "start_date": start_time.strftime('%Y-%m-%d'), | |
| "end_date": end_time.strftime('%Y-%m-%d') | |
| } | |
| try: | |
| # Fetch Air Quality Data | |
| print(f"Fetching air quality data from: {air_quality_url}") | |
| air_quality_response = requests.get(air_quality_url, params=air_quality_params) | |
| air_quality_response.raise_for_status() | |
| air_quality_data = air_quality_response.json() | |
| print("Air quality data retrieved.") | |
| # Fetch Temperature Data | |
| print(f"Fetching temperature data from: {weather_url}") | |
| weather_response = requests.get(weather_url, params=weather_params) | |
| weather_response.raise_for_status() | |
| weather_data = weather_response.json() | |
| print("Temperature data retrieved.") | |
| print("Data fetched successfully.") | |
| # Process Air Quality Data | |
| if 'hourly' not in air_quality_data or 'time' not in air_quality_data['hourly']: | |
| print("Error: 'hourly' or 'time' key not found in air quality response.") | |
| return None, "Error: Invalid air quality data format from API." | |
| df_aq = pd.DataFrame(air_quality_data['hourly']) | |
| df_aq['time'] = pd.to_datetime(df_aq['time']) | |
| df_aq.set_index('time', inplace=True) | |
| # Process Temperature Data | |
| if 'hourly' not in weather_data or 'time' not in weather_data['hourly']: | |
| print("Error: 'hourly' or 'time' key not found in weather response.") | |
| return None, "Error: Invalid weather data format from API." | |
| df_temp = pd.DataFrame(weather_data['hourly']) | |
| df_temp['time'] = pd.to_datetime(df_temp['time']) | |
| df_temp.set_index('time', inplace=True) | |
| # Merge dataframes | |
| df_merged = df_aq.merge(df_temp, left_index=True, right_index=True, how='outer') | |
| print("DataFrames merged.") | |
| # Resample to ensure consistent hourly frequency and fill missing data | |
| # Use 'h' for hourly resampling | |
| df_processed = df_merged.resample('h').ffill().bfill() | |
| print(f"DataFrame resampled to hourly. Shape: {df_processed.shape}") | |
| # Rename columns to match internal naming convention | |
| df_processed.rename(columns={'pm2_5': 'pm25', 'carbon_monoxide': 'co', 'temperature_2m': 'temp'}, inplace=True) | |
| print("Renamed columns.") | |
| # Calculate AQI for the processed data | |
| df_processed['calculated_aqi'] = df_processed.apply(lambda row: calculate_overall_aqi(row, aqi_breakpoints), axis=1) | |
| print("Calculated AQI.") | |
| # Select and reorder columns to match training data order | |
| required_columns = ['calculated_aqi', 'temp', 'pm25', 'pm10', 'co'] | |
| # Ensure all required columns exist before selecting | |
| if not all(col in df_processed.columns for col in required_columns): | |
| missing_cols = [col for col in required_columns if col not in df_processed.columns] | |
| print(f"Error: Missing required columns after processing: {missing_cols}") | |
| return None, f"Error: Missing required data columns: {missing_cols}" | |
| df_processed = df_processed[required_columns].copy() | |
| print(f"Selected and reordered columns. Final processing shape: {df_processed.shape}") | |
| # Handle any remaining NaNs after ffill/bfill (e.g., if the very first values were NaN or API returned all NaNs) | |
| initial_rows = len(df_processed) | |
| df_processed.dropna(inplace=True) | |
| if len(df_processed) < initial_rows: | |
| print(f"Warning: Dropped {initial_rows - len(df_processed)} rows with remaining NaNs.") | |
| # Check if enough data points are available | |
| if len(df_processed) < sequence_length: | |
| print(f"Error: Only retrieved and processed {len(df_processed)} data points, but {sequence_length} are required.") | |
| return None, f"Error: Insufficient historical data ({len(df_processed)} points available, {sequence_length} required)." | |
| # Select the last `sequence_length` rows for the input sequence | |
| latest_data_sequence_df = df_processed.tail(sequence_length).copy() # Use .copy() to avoid SettingWithCopyWarning | |
| print(f"Selected last {sequence_length} data points.") | |
| # Convert to numpy array and reshape (1, sequence_length, num_features) | |
| latest_data_sequence = latest_data_sequence_df.values.reshape(1, sequence_length, len(required_columns)) | |
| # Get the timestamps for output formatting later | |
| timestamps = latest_data_sequence_df.index.tolist() | |
| print(f"Prepared input sequence with shape: {latest_data_sequence.shape}") | |
| return latest_data_sequence, timestamps # Return data and timestamps | |
| except requests.exceptions.RequestException as e: | |
| print(f"API Request Error: {e}") | |
| return None, f"API Request Error: {e}" | |
| except Exception as e: | |
| print(f"An unexpected error occurred during data retrieval and processing: {e}") | |
| traceback.print_exc() | |
| return None, f"An unexpected error occurred during data processing: {e}" | |
| # --- Define paths to your saved files --- | |
| # Use relative paths assuming files are in the root directory of the Space | |
| MODEL_PATH = 'best_model_TKAN_nahead_1.keras' | |
| INPUT_SCALER_PATH = 'input_scaler.pkl' | |
| TARGET_SCALER_PATH = 'target_scaler.pkl' # This should be the scaler for the ratio | |
| # Y_SCALER_TRAIN_PATH = 'y_scaler_train.pkl' # Keep commented out unless you find a specific use for it in the inverse transform | |
| # --- Load the scalers and model --- | |
| input_scaler = None | |
| target_scaler = None # Scaler for the AQI/rolling_median ratio | |
| model = None | |
| try: | |
| with open(INPUT_SCALER_PATH, 'rb') as f: | |
| input_scaler = pickle.load(f) | |
| print(f"Input scaler loaded successfully from {INPUT_SCALER_PATH}") | |
| with open(TARGET_SCALER_PATH, 'rb') as f: | |
| target_scaler = pickle.load(f) | |
| print(f"Target scaler (for ratio) loaded successfully from {TARGET_SCALER_PATH}") | |
| except FileNotFoundError as e: | |
| print(f"Error loading scaler files: {e}") | |
| print("Please ensure input_scaler.pkl and target_scaler.pkl are in the correct directory.") | |
| # These need to be loaded for the app to work, so we might let the startup fail or raise an error here. | |
| # For a web app, letting it fail on startup and show in logs is better than running with None scalers. | |
| # However, for the purpose of giving you the code structure, we'll just print and model=None below. | |
| except Exception as e: | |
| print(f"An unexpected error occurred during scaler loading: {e}") | |
| traceback.print_exc() | |
| # Load the trained model with custom_object_scope | |
| custom_objects = {"TKAN": TKAN} | |
| if TKAT is not None: | |
| custom_objects["TKAT"] = TKAT | |
| try: | |
| print(f"Loading model from {MODEL_PATH}...") | |
| # Use custom_object_scope to register custom layers during loading | |
| with custom_object_scope(custom_objects): | |
| # compile=False because we only need the model for inference | |
| model = load_model(MODEL_PATH, compile=False) | |
| print("Model loaded successfully.") | |
| except FileNotFoundError: | |
| print(f"Error: Model file not found at {MODEL_PATH}.") | |
| except ValueError as e: | |
| print(f"Error loading model (ValueError): {e}") | |
| print("This can happen if the file is not a valid Keras file or if custom objects are not registered.") | |
| traceback.print_exc() | |
| except Exception as e: | |
| print(f"An unexpected error occurred during model loading: {e}") | |
| traceback.print_exc() | |
| # Initialize FastAPI app | |
| app = FastAPI() | |
| # Define the structure of the prediction request body | |
| class PredictionRequest(BaseModel): | |
| latitude: float | |
| longitude: float | |
| pm25: float = None # Make current inputs optional, rely primarily on historical fetch | |
| pm10: float = None | |
| co: float = None | |
| temp: float = None | |
| n_ahead: int = 1 # Default prediction steps | |
| # Define the structure of the prediction response body | |
| class PredictionResponse(BaseModel): | |
| status: str # "success" or "error" | |
| message: str # Description of the result or error | |
| predictions: list = None # List of {"timestamp": "...", "aqi": ...} or None on error | |
| # Define the prediction endpoint | |
| async def predict_aqi_endpoint(request: PredictionRequest): | |
| # Check if model and scalers were loaded successfully on startup | |
| if model is None or input_scaler is None or target_scaler is None: | |
| print("API called but model or scalers are not loaded.") | |
| # Return a 500 Internal Server Error if dependencies failed to load | |
| raise HTTPException(status_code=500, detail="Model or scalers not loaded. Check server logs for details.") | |
| # Get the expected sequence length and number of features from the model's input shape | |
| # Assuming input shape is (None, sequence_length, num_features) | |
| if model.input_shape is None or len(model.input_shape) < 2: | |
| print(f"Error: Model has unexpected input shape: {model.input_shape}") | |
| raise HTTPException(status_code=500, detail=f"Model has unexpected input shape: {model.input_shape}") | |
| SEQUENCE_LENGTH = model.input_shape[1] | |
| NUM_FEATURES = model.input_shape[2] | |
| required_num_features = len(['calculated_aqi', 'temp', 'pm25', 'pm10', 'co']) | |
| if NUM_FEATURES != required_num_features: | |
| print(f"Error: Model expects {NUM_FEATURES} features, but data processing provides {required_num_features}.") | |
| raise HTTPException(status_code=500, detail=f"Model expects {NUM_FEATURES} features, but data processing provides {required_num_features}.") | |
| # Get the historical data sequence and its timestamps from Open-Meteo | |
| # The function now returns the data and a message (or error) | |
| latest_data_sequence_unscaled, message = get_latest_data_sequence(SEQUENCE_LENGTH, request.latitude, request.longitude) | |
| # Check if data retrieval was successful | |
| if latest_data_sequence_unscaled is None: | |
| # Return an error response if data fetching failed | |
| print(f"Data retrieval failed: {message}") | |
| return PredictionResponse(status="error", message=f"Data retrieval failed: {message}") | |
| # The timestamps returned are for the sequence itself. We need timestamps for the *predictions*. | |
| # The predictions are for n_ahead steps *after* the last timestamp in the sequence. | |
| prediction_timestamps = [] | |
| if message and isinstance(message, list) and len(message) > 0: # 'message' is actually 'timestamps' here | |
| last_timestamp_of_sequence = message[-1] # Get the last timestamp from the sequence | |
| for i in range(request.n_ahead): | |
| # Prediction i (0-indexed) is for hour i+1 after the last timestamp | |
| prediction_timestamps.append(last_timestamp_of_sequence + timedelta(hours=i + 1)) | |
| else: | |
| print("Warning: Could not get valid timestamps from data retrieval. Prediction timestamps will be approximate.") | |
| # Fallback: Approximate timestamps based on current time | |
| now_utc = datetime.now(pytz.utc) | |
| for i in range(request.n_ahead): | |
| prediction_timestamps.append(now_utc + timedelta(hours=i+1)) | |
| # Optional: Update the last timestep with current user inputs if provided | |
| # Check if current inputs were provided and are valid (not None or NaN) | |
| if request.pm25 is not None and not pd.isna(request.pm25) and \ | |
| request.pm10 is not None and not pd.isna(request.pm10) and \ | |
| request.co is not None and not pd.isna(request.co) and \ | |
| request.temp is not None and not pd.isna(request.temp): | |
| current_aqi = calculate_overall_aqi({'pm25': request.pm25, 'pm10': request.pm10, 'co': request.co, 'temp': request.temp}, aqi_breakpoints) | |
| if not pd.isna(current_aqi): | |
| # Assuming column order: 'calculated_aqi', 'temp', 'pm25', 'pm10', 'co' | |
| # Update the last row (-1) of the input sequence | |
| latest_data_sequence_unscaled[0, -1, 0] = current_aqi | |
| latest_data_sequence_unscaled[0, -1, 1] = request.temp | |
| latest_data_sequence_unscaled[0, -1, 2] = request.pm25 | |
| latest_data_sequence_unscaled[0, -1, 3] = request.pm10 | |
| latest_data_sequence_unscaled[0, -1, 4] = request.co | |
| print("Updated last timestep of input sequence with current user inputs.") | |
| else: | |
| print("Warning: Could not calculate AQI for current inputs. Last timestep remains historical.") | |
| # Scale the input data | |
| try: | |
| X_scaled = input_scaler.transform(latest_data_sequence_unscaled) | |
| print("Input data scaled successfully.") | |
| except Exception as e: | |
| print(f"Error scaling input data: {e}") | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail="Error processing input data for prediction (scaling).") | |
| # Make prediction | |
| try: | |
| scaled_prediction = model.predict(X_scaled, verbose=0) # Shape (1, n_ahead) | |
| print(f"Model prediction made. Scaled prediction shape: {scaled_prediction.shape}") | |
| except Exception as e: | |
| print(f"Error during model prediction: {e}") | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail="Error during model prediction.") | |
| # Inverse transform the prediction | |
| try: | |
| # --- Inverse Transformation Logic (Based on Rolling Median Scaling) --- | |
| # This part needs the actual rolling median for the future prediction timesteps. | |
| # Using an approximation based on the input sequence. | |
| if latest_data_sequence_unscaled.shape[1] > 0: | |
| # Get the 'calculated_aqi' values from the unscaled input sequence | |
| calculated_aqi_sequence = latest_data_sequence_unscaled[0, :, 0] # Assuming AQI is the first feature | |
| # Approximate the rolling median based on the last few points of the input sequence | |
| # This is a simple approximation. A more robust method might be needed. | |
| approx_rolling_median_proxy = np.mean(calculated_aqi_sequence[-min(5, SEQUENCE_LENGTH):]) | |
| if pd.isna(approx_rolling_median_proxy) or approx_rolling_median_proxy <= 0: | |
| approx_rolling_median_proxy = 1.0 # Prevent division by zero or invalid scaling | |
| # Create a placeholder scaler array for the future timesteps | |
| corresponding_rolling_median_scaler = np.full((1, request.n_ahead, 1), approx_rolling_median_proxy, dtype=np.float32) | |
| print(f"Approximated rolling median proxy for inverse transform: {approx_rolling_median_proxy:.2f}") | |
| # 1. Inverse transform the scaled prediction (ratio) using the target_scaler | |
| y_unscaled_pred_ratio = target_scaler.inverse_transform(scaled_prediction.reshape(1, request.n_ahead, 1)) | |
| print(f"Inverse transformed to ratio scale. Shape: {y_unscaled_pred_ratio.shape}") | |
| # 2. Multiply the unscaled ratio by the approximated rolling median scaler | |
| predicted_aqi_values = y_unscaled_pred_ratio * corresponding_rolling_median_scaler | |
| predicted_aqi_values = predicted_aqi_values.flatten() # Shape (n_ahead,) | |
| else: | |
| print("Error: Input sequence is empty, cannot perform inverse transform.") | |
| raise ValueError("Input sequence is empty.") | |
| print(f"Final predicted AQI values: {predicted_aqi_values}") | |
| except Exception as e: | |
| print(f"Error during inverse transformation: {e}") | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail="Error processing prediction results (inverse transform).") | |
| # Prepare the prediction output list | |
| predictions_list = [] | |
| for i in range(request.n_ahead): | |
| # Use the calculated prediction_timestamps | |
| timestamp_str = prediction_timestamps[i].strftime('%Y-%m-%d %H:%M:%S') | |
| predictions_list.append({ | |
| "timestamp": timestamp_str, | |
| "aqi": float(predicted_aqi_values[i]) # Ensure AQI is a standard float | |
| }) | |
| # Return the successful response | |
| return PredictionResponse(status="success", message="Prediction successful.", predictions=predictions_list) | |
| # Root endpoint for health check | |
| async def read_root(): | |
| return {"message": "AQI Prediction API is running."} |