nikethanreddy commited on
Commit
c552458
·
verified ·
1 Parent(s): 1ee468c

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +86 -55
app.py CHANGED
@@ -1,4 +1,3 @@
1
-
2
  import os
3
  os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
4
  os.environ['JAX_PLATFORMS'] = 'cpu'
@@ -134,13 +133,17 @@ def calculate_overall_aqi(row, aqi_breakpoints):
134
  def get_latest_data_sequence(sequence_length: int, latitude: float, longitude: float):
135
  print(f"Attempting to retrieve data for the last {sequence_length} hours from Open-Meteo for Lat: {latitude}, Lon: {longitude}")
136
 
137
- fetch_hours = sequence_length + 5
 
 
 
 
 
138
 
139
- end_time_for_temp = datetime.now(pytz.utc)
140
- start_time_for_temp = end_time_for_temp - timedelta(hours=fetch_hours)
141
 
142
- print(f"Requesting data for the past {fetch_hours} hours for air quality.")
143
- print(f"Requesting temperature data from {start_time_for_temp.strftime('%Y-%m-%d %H:%M:%S UTC')} to {end_time_for_temp.strftime('%Y-%m-%d %H:%M:%S UTC')}")
144
 
145
  air_quality_url = "https://air-quality-api.open-meteo.com/v1/air-quality"
146
  air_quality_params = {
@@ -148,19 +151,20 @@ def get_latest_data_sequence(sequence_length: int, latitude: float, longitude: f
148
  "longitude": longitude,
149
  "hourly": ["pm2_5", "pm10", "carbon_monoxide"],
150
  "timezone": "UTC",
151
- "past_hours": fetch_hours
152
  }
153
- print(f"Air quality API params: {air_quality_params}")
154
 
155
- weather_url = "https://api.open-meteo.com/v1/forecast"
 
156
  weather_params = {
157
  "latitude": latitude,
158
  "longitude": longitude,
159
  "hourly": ["temperature_2m"],
160
  "timezone": "UTC",
161
- "past_hours": fetch_hours
162
  }
163
- print(f"Temperature API params: {weather_params}")
164
 
165
  try:
166
  print(f"Fetching air quality data from: {air_quality_url}")
@@ -175,34 +179,42 @@ def get_latest_data_sequence(sequence_length: int, latitude: float, longitude: f
175
  weather_data = weather_response.json()
176
  print("Temperature data retrieved.")
177
 
178
- print("Data fetched successfully.")
179
 
180
  if 'hourly' not in air_quality_data or 'time' not in air_quality_data['hourly']:
181
  print("Error: 'hourly' or 'time' key not found in air quality response.")
182
  return None, "Error: Invalid air quality data format from API."
183
  df_aq = pd.DataFrame(air_quality_data['hourly'])
184
- if df_aq.empty or not all(col in df_aq.columns for col in ['time', 'pm2_5', 'pm10', 'carbon_monoxide']):
185
- print("Warning: Air quality data is empty or missing required columns ('time', 'pm2_5', 'pm10', 'carbon_monoxide') after fetching.")
186
- # Depending on how critical each pollutant is, you might allow continuation or return error
187
- # For now, let's be strict if key columns are missing from the structure
188
- if 'time' not in df_aq.columns:
189
- return None, "Error: 'time' column missing in air quality data."
190
- df_aq['time'] = pd.to_datetime(df_aq['time'])
191
- df_aq.set_index('time', inplace=True)
192
- print(f"Processed df_aq. Shape: {df_aq.shape}. Columns: {df_aq.columns.tolist()}")
193
-
 
194
 
195
  if 'hourly' not in weather_data or 'time' not in weather_data['hourly']:
196
  print("Error: 'hourly' or 'time' key not found in weather response.")
197
  return None, "Error: Invalid weather data format from API."
198
  df_temp = pd.DataFrame(weather_data['hourly'])
199
- if df_temp.empty or not all(col in df_temp.columns for col in ['time', 'temperature_2m']):
200
- print("Warning: Temperature data is empty or missing required columns ('time', 'temperature_2m') after fetching.")
201
- if 'time' not in df_temp.columns:
202
- return None, "Error: 'time' column missing in temperature data."
203
- df_temp['time'] = pd.to_datetime(df_temp['time'])
204
- df_temp.set_index('time', inplace=True)
205
- print(f"Processed df_temp. Shape: {df_temp.shape}. Columns: {df_temp.columns.tolist()}")
 
 
 
 
 
 
 
206
 
207
  df_merged = df_aq.merge(df_temp, left_index=True, right_index=True, how='inner')
208
  print(f"DataFrames merged (inner). Initial merged shape: {df_merged.shape}")
@@ -210,55 +222,71 @@ def get_latest_data_sequence(sequence_length: int, latitude: float, longitude: f
210
  print("Error: Inner merge of AQ and Temperature data resulted in an empty DataFrame. No overlapping timestamps with data.")
211
  return None, "Error: No overlapping AQ and Temperature data available for the period."
212
 
213
- df_processed = df_merged.resample('h').ffill().bfill()
214
- print(f"DataFrame resampled to hourly. Shape: {df_processed.shape}")
 
 
 
 
215
 
216
  df_processed.rename(columns={'pm2_5': 'pm25', 'carbon_monoxide': 'co', 'temperature_2m': 'temp'}, inplace=True)
217
  print(f"Renamed columns. Current columns: {df_processed.columns.tolist()}")
218
 
219
- # Ensure all expected columns exist after rename, before calculating AQI
220
- expected_cols_for_aqi = ['pm25', 'pm10', 'co'] # temp is also in df_processed
221
- missing_for_aqi = [col for col in expected_cols_for_aqi if col not in df_processed.columns]
222
- if missing_for_aqi:
223
- print(f"Warning: Missing columns required for AQI calculation after rename: {missing_for_aqi}. AQI might be NaN.")
224
- # Add missing columns with NaNs if they don't exist, so calculate_overall_aqi doesn't fail
225
- for col in missing_for_aqi:
226
  df_processed[col] = np.nan
227
-
228
-
229
  df_processed['calculated_aqi'] = df_processed.apply(lambda row: calculate_overall_aqi(row, aqi_breakpoints), axis=1)
230
  print("Calculated AQI.")
 
 
231
 
232
  required_columns = ['calculated_aqi', 'temp', 'pm25', 'pm10', 'co']
233
- # Ensure all required columns exist before selecting, add if missing to prevent KeyError
234
  for col in required_columns:
235
  if col not in df_processed.columns:
236
  print(f"Warning: Column '{col}' is missing before final selection. Adding it as NaN.")
237
  df_processed[col] = np.nan
238
 
239
  df_processed = df_processed[required_columns].copy()
240
- print(f"Selected and reordered columns. Current shape: {df_processed.shape}. Columns: {df_processed.columns.tolist()}")
241
 
242
- initial_rows = len(df_processed)
243
- df_processed.dropna(inplace=True)
244
- if len(df_processed) < initial_rows:
245
- print(f"Warning: Dropped {initial_rows - len(df_processed)} rows with remaining NaNs after all processing.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
246
 
247
- if len(df_processed) < sequence_length:
248
- print(f"Error: Only {len(df_processed)} valid data points remain after processing, but {sequence_length} are required.")
249
- return None, f"Error: Insufficient historical data ({len(df_processed)} points available, {sequence_length} required)."
250
 
251
- latest_data_sequence_df = df_processed.tail(sequence_length).copy()
252
- print(f"Selected last {sequence_length} data points for model input.")
253
 
254
  latest_data_sequence = latest_data_sequence_df.values.reshape(1, sequence_length, len(required_columns))
255
  timestamps = latest_data_sequence_df.index.tolist()
256
- print(f"Prepared input sequence with shape: {latest_data_sequence.shape}")
257
 
258
  return latest_data_sequence, timestamps
259
 
260
  except requests.exceptions.RequestException as e:
261
  print(f"API Request Error: {e}")
 
262
  return None, f"API Request Error: {e}"
263
  except Exception as e:
264
  print(f"An unexpected error occurred during data retrieval and processing: {e}")
@@ -384,15 +412,18 @@ async def predict_aqi_endpoint(request: PredictionRequest):
384
 
385
  current_aqi = calculate_overall_aqi({'pm25': request.pm25, 'pm10': request.pm10, 'co': request.co, 'temp': request.temp}, aqi_breakpoints)
386
 
387
- if not pd.isna(current_aqi):
388
  latest_data_sequence_unscaled[0, -1, 0] = current_aqi
389
  latest_data_sequence_unscaled[0, -1, 1] = request.temp
390
  latest_data_sequence_unscaled[0, -1, 2] = request.pm25
391
  latest_data_sequence_unscaled[0, -1, 3] = request.pm10
392
  latest_data_sequence_unscaled[0, -1, 4] = request.co
393
  print("Updated last timestep of input sequence with current user inputs.")
394
- else:
395
  print("Warning: Could not calculate AQI for current inputs. Last timestep remains historical.")
 
 
 
396
 
397
  try:
398
  X_scaled = input_scaler.transform(latest_data_sequence_unscaled)
@@ -449,4 +480,4 @@ async def predict_aqi_endpoint(request: PredictionRequest):
449
 
450
  @app.get("/")
451
  async def read_root():
452
- return {"message": "AQI Prediction API is running."}
 
 
1
  import os
2
  os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
3
  os.environ['JAX_PLATFORMS'] = 'cpu'
 
133
  def get_latest_data_sequence(sequence_length: int, latitude: float, longitude: float):
134
  print(f"Attempting to retrieve data for the last {sequence_length} hours from Open-Meteo for Lat: {latitude}, Lon: {longitude}")
135
 
136
+ current_utc_time = datetime.now(pytz.utc)
137
+ print(f"Current UTC time on server for API calls: {current_utc_time.strftime('%Y-%m-%d %H:%M:%S UTC')}")
138
+
139
+ # Define a window to fetch from APIs, slightly larger than sequence_length to allow for finding complete data
140
+ # e.g., if sequence_length is 24, fetch last 48 hours to have a good buffer
141
+ api_fetch_past_hours = sequence_length + 24 # Fetch a wider window, e.g., 48 hours for a 24-hour sequence
142
 
143
+ # This window will be used to filter the processed data before dropna and tail
144
+ processing_window_hours = sequence_length + 24 # e.g., 48 hours
145
 
146
+ print(f"Requesting data for the past {api_fetch_past_hours} hours for air quality and temperature from APIs.")
 
147
 
148
  air_quality_url = "https://air-quality-api.open-meteo.com/v1/air-quality"
149
  air_quality_params = {
 
151
  "longitude": longitude,
152
  "hourly": ["pm2_5", "pm10", "carbon_monoxide"],
153
  "timezone": "UTC",
154
+ "past_hours": api_fetch_past_hours
155
  }
156
+ # print(f"Air quality API params: {air_quality_params}")
157
 
158
+ # Using forecast API for temperature as per user's finding that it works better
159
+ weather_url = "https://api.open-meteo.com/v1/forecast"
160
  weather_params = {
161
  "latitude": latitude,
162
  "longitude": longitude,
163
  "hourly": ["temperature_2m"],
164
  "timezone": "UTC",
165
+ "past_hours": api_fetch_past_hours # Fetch same window for temperature
166
  }
167
+ # print(f"Temperature API params: {weather_params}")
168
 
169
  try:
170
  print(f"Fetching air quality data from: {air_quality_url}")
 
179
  weather_data = weather_response.json()
180
  print("Temperature data retrieved.")
181
 
182
+ print("Data fetched successfully from APIs.")
183
 
184
  if 'hourly' not in air_quality_data or 'time' not in air_quality_data['hourly']:
185
  print("Error: 'hourly' or 'time' key not found in air quality response.")
186
  return None, "Error: Invalid air quality data format from API."
187
  df_aq = pd.DataFrame(air_quality_data['hourly'])
188
+ if df_aq.empty:
189
+ print("Warning: Air quality data DataFrame is empty after fetching.")
190
+ # Continue if not empty, but columns might be missing
191
+ if not df_aq.empty and not all(col in df_aq.columns for col in ['time', 'pm2_5', 'pm10', 'carbon_monoxide']):
192
+ print("Warning: Air quality data is missing some expected columns ('time', 'pm2_5', 'pm10', 'carbon_monoxide') after fetching.")
193
+ if 'time' not in df_aq.columns and not df_aq.empty:
194
+ return None, "Error: 'time' column missing in air quality data."
195
+ if not df_aq.empty:
196
+ df_aq['time'] = pd.to_datetime(df_aq['time'])
197
+ df_aq.set_index('time', inplace=True)
198
+ print(f"Processed df_aq. Shape: {df_aq.shape}. Columns: {df_aq.columns.tolist() if not df_aq.empty else 'N/A'}")
199
 
200
  if 'hourly' not in weather_data or 'time' not in weather_data['hourly']:
201
  print("Error: 'hourly' or 'time' key not found in weather response.")
202
  return None, "Error: Invalid weather data format from API."
203
  df_temp = pd.DataFrame(weather_data['hourly'])
204
+ if df_temp.empty:
205
+ print("Warning: Temperature data DataFrame is empty after fetching.")
206
+ if not df_temp.empty and not all(col in df_temp.columns for col in ['time', 'temperature_2m']):
207
+ print("Warning: Temperature data is missing some expected columns ('time', 'temperature_2m') after fetching.")
208
+ if 'time' not in df_temp.columns and not df_temp.empty:
209
+ return None, "Error: 'time' column missing in temperature data."
210
+ if not df_temp.empty:
211
+ df_temp['time'] = pd.to_datetime(df_temp['time'])
212
+ df_temp.set_index('time', inplace=True)
213
+ print(f"Processed df_temp. Shape: {df_temp.shape}. Columns: {df_temp.columns.tolist() if not df_temp.empty else 'N/A'}")
214
+
215
+ if df_aq.empty or df_temp.empty:
216
+ print("Error: One or both dataframes (AQ, Temp) are empty before merge. Cannot proceed.")
217
+ return None, "Error: Insufficient data from APIs (AQ or Temp empty)."
218
 
219
  df_merged = df_aq.merge(df_temp, left_index=True, right_index=True, how='inner')
220
  print(f"DataFrames merged (inner). Initial merged shape: {df_merged.shape}")
 
222
  print("Error: Inner merge of AQ and Temperature data resulted in an empty DataFrame. No overlapping timestamps with data.")
223
  return None, "Error: No overlapping AQ and Temperature data available for the period."
224
 
225
+ # Resample to ensure consistent hourly frequency and fill missing data
226
+ df_processed = df_merged.resample('h').mean() # Use mean for resampling to handle potential duplicates at same hour
227
+ df_processed = df_processed.ffill().bfill() # Then fill
228
+ print(f"DataFrame resampled to hourly, filled NaNs. Shape: {df_processed.shape}")
229
+ # print(f"df_processed head after resample/ffill/bfill:\n{df_processed.head().to_string()}")
230
+ # print(f"df_processed NaNs after resample/ffill/bfill:\n{df_processed.isna().sum().to_string()}")
231
 
232
  df_processed.rename(columns={'pm2_5': 'pm25', 'carbon_monoxide': 'co', 'temperature_2m': 'temp'}, inplace=True)
233
  print(f"Renamed columns. Current columns: {df_processed.columns.tolist()}")
234
 
235
+ expected_cols_for_aqi = ['pm25', 'pm10', 'co']
236
+ for col in expected_cols_for_aqi:
237
+ if col not in df_processed.columns:
238
+ print(f"Warning: Column '{col}' for AQI calculation is missing after rename. Adding as NaN.")
 
 
 
239
  df_processed[col] = np.nan
240
+
 
241
  df_processed['calculated_aqi'] = df_processed.apply(lambda row: calculate_overall_aqi(row, aqi_breakpoints), axis=1)
242
  print("Calculated AQI.")
243
+ # print(f"df_processed head after AQI calculation:\n{df_processed.head().to_string()}")
244
+ # print(f"df_processed NaNs after AQI calculation:\n{df_processed.isna().sum().to_string()}")
245
 
246
  required_columns = ['calculated_aqi', 'temp', 'pm25', 'pm10', 'co']
 
247
  for col in required_columns:
248
  if col not in df_processed.columns:
249
  print(f"Warning: Column '{col}' is missing before final selection. Adding it as NaN.")
250
  df_processed[col] = np.nan
251
 
252
  df_processed = df_processed[required_columns].copy()
253
+ # print(f"Selected and reordered columns. Shape before windowing: {df_processed.shape}. Columns: {df_processed.columns.tolist()}")
254
 
255
+ # Filter to the defined processing window relative to current time
256
+ # Ensure we only consider data up to the current hour and back by processing_window_hours
257
+ window_start_time = current_utc_time.replace(minute=0, second=0, microsecond=0) - timedelta(hours=processing_window_hours - 1)
258
+ window_end_time = current_utc_time.replace(minute=0, second=0, microsecond=0)
259
+
260
+ df_recent_processed = df_processed[(df_processed.index >= window_start_time) & (df_processed.index <= window_end_time)].copy()
261
+ print(f"Filtered to recent processing window ({processing_window_hours}hrs). Shape: {df_recent_processed.shape}")
262
+ # print(f"df_recent_processed head:\n{df_recent_processed.head().to_string()}")
263
+ # print(f"df_recent_processed NaNs before dropna:\n{df_recent_processed.isna().sum().to_string()}")
264
+
265
+
266
+ initial_rows_recent = len(df_recent_processed)
267
+ df_recent_processed.dropna(inplace=True)
268
+ if len(df_recent_processed) < initial_rows_recent:
269
+ print(f"Warning: Dropped {initial_rows_recent - len(df_recent_processed)} rows with NaNs from the recent processing window.")
270
+ print(f"Shape after dropna on recent window: {df_recent_processed.shape}")
271
+
272
+ if len(df_recent_processed) < sequence_length:
273
+ print(f"Error: Only {len(df_recent_processed)} valid data points remain in the recent window after processing, but {sequence_length} are required.")
274
+ return None, f"Error: Insufficient historical data in the recent window ({len(df_recent_processed)} points available, {sequence_length} required)."
275
 
276
+ latest_data_sequence_df = df_recent_processed.tail(sequence_length).copy()
277
+ print(f"Selected last {sequence_length} data points for model input. Shape: {latest_data_sequence_df.shape}")
278
+ # print(f"Final sequence data:\n{latest_data_sequence_df.to_string()}")
279
 
 
 
280
 
281
  latest_data_sequence = latest_data_sequence_df.values.reshape(1, sequence_length, len(required_columns))
282
  timestamps = latest_data_sequence_df.index.tolist()
283
+ # print(f"Prepared input sequence with shape: {latest_data_sequence.shape}")
284
 
285
  return latest_data_sequence, timestamps
286
 
287
  except requests.exceptions.RequestException as e:
288
  print(f"API Request Error: {e}")
289
+ traceback.print_exc()
290
  return None, f"API Request Error: {e}"
291
  except Exception as e:
292
  print(f"An unexpected error occurred during data retrieval and processing: {e}")
 
412
 
413
  current_aqi = calculate_overall_aqi({'pm25': request.pm25, 'pm10': request.pm10, 'co': request.co, 'temp': request.temp}, aqi_breakpoints)
414
 
415
+ if not pd.isna(current_aqi) and latest_data_sequence_unscaled.shape[1] == SEQUENCE_LENGTH : # Ensure sequence is correctly shaped
416
  latest_data_sequence_unscaled[0, -1, 0] = current_aqi
417
  latest_data_sequence_unscaled[0, -1, 1] = request.temp
418
  latest_data_sequence_unscaled[0, -1, 2] = request.pm25
419
  latest_data_sequence_unscaled[0, -1, 3] = request.pm10
420
  latest_data_sequence_unscaled[0, -1, 4] = request.co
421
  print("Updated last timestep of input sequence with current user inputs.")
422
+ elif pd.isna(current_aqi):
423
  print("Warning: Could not calculate AQI for current inputs. Last timestep remains historical.")
424
+ else:
425
+ print("Warning: Sequence not correctly shaped to update with current user inputs, or current_aqi is NaN.")
426
+
427
 
428
  try:
429
  X_scaled = input_scaler.transform(latest_data_sequence_unscaled)
 
480
 
481
  @app.get("/")
482
  async def read_root():
483
+ return {"message": "AQI Prediction API is running."}