env-data / app.py
aromidvar1355's picture
Update app.py
6d957dc verified
import os
import streamlit as st
import pandas as pd
import requests
import datetime
import plotly.express as px
import json
import folium
from streamlit_folium import folium_static
import numpy as np
import time
from datetime import datetime, timedelta
import pytz
import logging
import io
# Set up logging
logging.basicConfig(level=logging.DEBUG)
# --- API KEYS ---
OPENWEATHER_API_KEY = "c6c267b301a145ddec9b381e7d87a5af"
STORMGLASS_API_KEY = "bcabf6a8-0641-11f0-a4a9-0242ac130003-bcabf72a-0641-11f0-a4a9-0242ac130003" # Page configuration
# --- PAGE CONFIGURATION ---
st.set_page_config(page_title="Environmental Data Dashboard", layout="wide")
# --- SIDEBAR ---
st.sidebar.title("🌍 Environmental Data Dashboard")
data_source = st.sidebar.selectbox(
"Select Data Source",
["Weather Data", "Marine", "Solunar & Tide Data", "Fish Categories Data"]
)
# Initialize global lists for each data source
openweather_historical_readings = []
openweather_forecast_readings = []
stormglass_marine_readings = []
solunar_tide_readings = []
# Initialize session state for historical weather data
if 'openweather_historical_readings' not in st.session_state:
st.session_state.openweather_historical_readings = []
# Initialize session state for marine data
if 'stormglass_marine_readings' not in st.session_state:
st.session_state.stormglass_marine_readings = []
# Initialize session state for solunar and tide data
if 'solunar_tide_readings' not in st.session_state:
st.session_state.solunar_tide_readings = []
# --- API CALLING FUNCTIONS ---
def fetch_openweather_data(lat, lon, datetime_input):
"""Fetches historical weather data from OpenWeather's One Call API."""
try:
url = f"https://api.openweathermap.org/data/3.0/onecall/timemachine?lat={lat}&lon={lon}&dt={int(datetime_input.timestamp())}&appid={OPENWEATHER_API_KEY}&units=imperial"
response = requests.get(url)
response.raise_for_status()
data = response.json()
current_weather_data = data.get('data', [])[0] if data.get('data', []) else {}
# Extract sunrise and sunset timestamps and convert them to datetime objects
sunrise = current_weather_data.get('sunrise')
sunset = current_weather_data.get('sunset')
if sunrise:
sunrise_dt = datetime.fromtimestamp(sunrise)
else:
sunrise_dt = None
if sunset:
sunset_dt = datetime.fromtimestamp(sunset)
else:
sunset_dt = None
processed_entry = {
'timestamp': datetime_input,
'latitude': lat,
'longitude': lon,
'temperature': current_weather_data.get('temp'),
'feels_like': current_weather_data.get('feels_like'),
'humidity': current_weather_data.get('humidity'),
'wind_speed': current_weather_data.get('wind_speed'),
'wind_direction': current_weather_data.get('wind_deg'),
'cloud_cover': current_weather_data.get('clouds'),
'pressure': current_weather_data.get('pressure'),
'dew_point': current_weather_data.get('dew_point'),
'sunrise': sunrise_dt,
'sunset': sunset_dt
}
# Append the processed entry to the session state list
st.session_state.openweather_historical_readings.append(processed_entry)
return data
# After fetching data
logging.debug(f"Processed Entry: {processed_entry}")
# When accessing historical data
logging.debug(f"Historical data count: {len(openweather_historical_readings)}")
except Exception as err:
st.error(f"An error occurred: {err}")
def fetch_openweather_forecast(lat, lon):
"""Fetches 7-day weather forecast data from OpenWeather's One Call API."""
url = f"https://api.openweathermap.org/data/3.0/onecall?lat={lat}&lon={lon}&exclude=current,minutely,hourly,alerts&appid={OPENWEATHER_API_KEY}&units=imperial"
try:
response = requests.get(url)
response.raise_for_status() # Raise HTTPError for bad responses
data = response.json()
# Store the forecast data in the global list
if data and 'daily' in data:
for day_data in data['daily']:
processed_entry = {
'date': day_data.get('dt', None),
'latitude': lat,
'longitude': lon,
'temperature_day': day_data.get('temp', {}).get('day', None),
'temperature_night': day_data.get('temp', {}).get('night', None),
'humidity': day_data.get('humidity', None),
'wind_speed': day_data.get('wind_speed', None),
# Add other relevant fields
}
openweather_forecast_readings.append(processed_entry)
return data
except requests.exceptions.RequestException as e:
st.error(f"Error fetching OpenWeather forecast data: {e}")
return None
def fetch_stormglass_data(lat, lng, start_datetime, end_datetime, selected_params):
"""Fetches marine data from Stormglass API."""
try:
params_str = ','.join(selected_params)
start_str = start_datetime.isoformat()
end_str = end_datetime.isoformat()
url = f"https://api.stormglass.io/v2/weather/point?lat={lat}&lng={lng}&params={params_str}&start={start_str}&end={end_str}"
headers = {'Authorization': STORMGLASS_API_KEY}
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
if data and 'hours' in data:
for hour_data in data['hours']:
processed_entry = {
'timestamp': hour_data.get('time'),
'latitude': lat,
'longitude': lng,
'waveHeight': hour_data.get('waveHeight', {}).get('value'),
'wavePeriod': hour_data.get('wavePeriod', {}).get('value'),
'waveDirection': hour_data.get('waveDirection', {}).get('value'),
'windSpeed': hour_data.get('windSpeed', {}).get('value'),
'windDirection': hour_data.get('windDirection', {}).get('value'),
'airTemperature': hour_data.get('airTemperature', {}).get('value'),
'waterTemperature': hour_data.get('waterTemperature', {}).get('value'),
'seaLevel': hour_data.get('seaLevel', {}).get('value'),
'humidity': hour_data.get('humidity', {}).get('value'),
'precipitation': hour_data.get('precipitation', {}).get('value'),
'visibility': hour_data.get('visibility', {}).get('value'),
'currentSpeed': hour_data.get('currentSpeed', {}).get('value'),
'currentDirection': hour_data.get('currentDirection', {}).get('value')
}
st.session_state.stormglass_marine_readings.append(processed_entry)
return data
except requests.exceptions.RequestException as e:
st.error(f"Error fetching Stormglass marine data: {e}")
return None
def fetch_solunar_tide_data(lat, lon, datetime_input):
"""Fetches solunar and tide data from Stormglass API."""
try:
headers = {'Authorization': STORMGLASS_API_KEY}
params = {
'lat': lat,
'lng': lon,
'date': datetime_input.date().isoformat()
}
tide_response = requests.get('https://api.stormglass.io/v2/tide/extremes/point', headers=headers, params=params)
astronomy_response = requests.get('https://api.stormglass.io/v2/astronomy/point', headers=headers, params=params)
tide_data = tide_response.json() if tide_response.status_code == 200 else None
astronomy_data = astronomy_response.json() if astronomy_response.status_code == 200 else None
if tide_data or astronomy_data:
processed_entry = {
'date': datetime_input.date().isoformat(),
'latitude': lat,
'longitude': lon,
'tide_data': tide_data,
'astronomy_data': astronomy_data
}
st.session_state.solunar_tide_readings.append(processed_entry)
return {
'tide': tide_data,
'astronomy': astronomy_data
}
except Exception as e:
st.error(f"Error fetching Solunar and Tide data: {e}")
return None
def save_openweather_historical_to_csv():
"""Save historical OpenWeather data to CSV for download."""
try:
# Access the historical data from session state
openweather_historical_readings = st.session_state.openweather_historical_readings
# Validate input data
if not openweather_historical_readings:
raise ValueError("No historical data available.")
# Ensure the data is in the correct format (list of dictionaries)
if not isinstance(openweather_historical_readings, list):
raise ValueError("Historical readings must be a list of dictionaries.")
# Create a DataFrame from the readings
df = pd.DataFrame(openweather_historical_readings)
# Ensure the DataFrame is not empty
if df.empty:
raise ValueError("DataFrame is empty.")
# Convert datetime columns to strings
datetime_columns = ['timestamp', 'sunrise', 'sunset']
for col in datetime_columns:
if col in df.columns:
df[col] = df[col].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S') if x is not None else None)
# Log DataFrame preview for debugging
logging.debug("DataFrame Preview:")
logging.debug(df.head())
# Create a CSV buffer in memory
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
# Return the CSV data as a string
csv_data = csv_buffer.getvalue()
logging.debug("CSV data generated successfully.")
return csv_data
except ValueError as ve:
logging.error(f"Validation error: {ve}")
st.error(f"Validation error: {ve}")
return None
except Exception as e:
logging.error(f"Error saving historical data: {str(e)}")
st.error(f"Error saving historical data: {str(e)}")
return None
def save_stormglass_marine_to_csv():
"""Save Stormglass marine data to CSV."""
if not stormglass_marine_readings:
st.error("No marine data to save!")
return
df = pd.DataFrame(stormglass_marine_readings)
return df
def save_solunar_tide_to_csv():
"""Save solunar and tide data to CSV."""
if not solunar_tide_readings:
st.error("No solunar or tide data to save!")
return
df = pd.DataFrame(solunar_tide_readings)
return df
# --- MAIN APPLICATION LOGIC ---
def main():
if data_source == "Weather Data":
st.title("OpenWeather Data")
col1, col2 = st.columns(2)
with col1:
st.subheader("Location and Time Settings")
# Latitude and Longitude inputs
lat = st.number_input("Latitude", value=25.7617, step=0.001, format="%.4f")
lon = st.number_input("Longitude", value=-80.1918, step=0.001, format="%.4f")
# Manage session state for date and time inputs
if 'datetime_input' not in st.session_state:
# Initialize with current date and time if not already present
st.session_state['datetime_input'] = datetime.now()
# Split the stored datetime into date and time components
datetime_input = st.session_state['datetime_input']
input_date = datetime_input.date() # Use the date part
input_time = datetime_input.time() # Use the time part
# Allow user to set new date and time
input_date = st.date_input("Date", value=input_date)
input_time = st.time_input("Time", value=input_time)
# Combine date and time into a datetime object
datetime_input = datetime.combine(input_date, input_time)
st.session_state['datetime_input'] = datetime_input # Store the datetime in the session state
if st.button("Get Current Weather"):
with st.spinner("Fetching data..."):
data = fetch_openweather_data(lat, lon, datetime_input)
if data and 'data' in data:
current_weather_data = data['data']
if current_weather_data:
current = current_weather_data[0]
# Convert and format sunrise and sunset
timezone = data.get('timezone', 'UTC')
tz = pytz.timezone(timezone)
sunrise_time = datetime.fromtimestamp(current['sunrise'], tz).strftime('%Y-%m-%d %H:%M:%S')
sunset_time = datetime.fromtimestamp(current['sunset'], tz).strftime('%Y-%m-%d %H:%M:%S')
st.subheader(f"Weather at {lat}, {lon} on {datetime_input}")
weather_col1, weather_col2 = st.columns(2)
with weather_col1:
st.metric("Temperature", f"{current['temp']}Β°F")
st.metric("Feels Like", f"{current['feels_like']}Β°F")
st.metric("Humidity", f"{current['humidity']}%")
st.metric("Pressure", f"{current['pressure']} in")
st.metric("Dew Point", f"{current['dew_point']} Β°F")
with weather_col2:
st.metric("Wind Speed", f"{current['wind_speed']} mph")
st.metric("Wind Direction", f"{current['wind_deg']}Rad")
#st.metric("UV Index", f"{current['uvi']}")
st.metric("Cloud Cover", f"{current.get('clouds', 'N/A')}%")
# Display sunrise and sunset times
st.write(f"**Sunrise:** {sunrise_time}")
st.write(f"**Sunset:** {sunset_time}")
# Weather description
if 'weather' in current and current['weather']:
st.write(f"**Conditions:** {current['weather'][0]['description']}")
# Map display
m = folium.Map(location=[lat, lon], zoom_start=10)
folium.Marker(
location=[lat, lon],
popup=f"{lat}, {lon}",
icon=folium.Icon(color="red")
).add_to(m)
folium_static(m)
# Download button for historical data
if st.button("Download Historical Weather Data"):
csv_data = save_openweather_historical_to_csv()
if csv_data:
try:
# Create a download button
st.download_button(
label="Download Historical Weather Data CSV",
data=csv_data,
file_name=f"openweather_historical_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",
mime="text/csv",
)
st.success("Historical weather data saved and ready for download!")
except Exception as e:
st.error(f"Error preparing download: {str(e)}")
else:
st.error("No historical data available.")
with col2:
if st.button("Get Weather Forecast"):
with st.spinner("Fetching forecast data..."):
forecast_data = fetch_openweather_forecast(lat, lon)
if forecast_data and 'daily' in forecast_data:
st.subheader(f"7-Day Weather Forecast for {lat}, {lon}")
# Prepare forecast data
forecast_list = []
for daily_forecast in forecast_data['daily']:
forecast_list.append({
'date': datetime.fromtimestamp(daily_forecast['dt']),
'temperature_day': daily_forecast['temp']['day'],
'temperature_night': daily_forecast['temp']['night'],
'feels_like_day': daily_forecast['feels_like']['day'],
'humidity': daily_forecast['humidity'],
'wind_speed': daily_forecast['wind_speed'],
'description': daily_forecast['weather'][0]['description']
})
forecast_df = pd.DataFrame(forecast_list)
# Temperature plot
fig_temp = px.line(forecast_df, x='date', y=['temperature_day', 'temperature_night'],
title="Daily Temperature Forecast",
labels={'value': 'Temperature (Β°F)', 'date': 'Date'},
color_discrete_map={'temperature_day': 'red', 'temperature_night': 'blue'})
st.plotly_chart(fig_temp, use_container_width=True)
# Forecast details
for forecast in forecast_list:
col_date, col_details = st.columns([1, 3])
with col_date:
st.write(f"**{forecast['date'].strftime('%Y-%m-%d')}**")
with col_details:
st.write(f"Day Temp: {forecast['temperature_day']}Β°F | Night Temp: {forecast['temperature_night']}Β°F")
st.write(f"Conditions: {forecast['description']}")
st.write(f"Humidity: {forecast['humidity']}% | Wind: {forecast['wind_speed']} mph")
elif data_source == "Marine":
st.title("Stormglass Marine & Weather Data")
st.subheader("Detailed Marine Weather Conditions and Forecasts")
col1, col2 = st.columns(2)
with col1:
st.subheader("Location Settings")
lat = st.number_input("Latitude", value=25.7617, step=0.001, format="%.4f")
lng = st.number_input("Longitude", value=-80.1918, step=0.001, format="%.4f")
# Map for selecting location
map_data = pd.DataFrame({
'lat': [lat],
'lon': [lng]
})
st.map(map_data)
# Date range selection
st.subheader("Time Range")
col_start, col_end = st.columns(2)
with col_start:
start_date = st.date_input("Start Date", datetime.now())
start_time = st.time_input("Start Time", datetime.time(datetime.now()))
with col_end:
end_date = st.date_input("End Date", datetime.now() + timedelta(days=1))
end_time = st.time_input("End Time", datetime.time(datetime.now()))
# Combine date and time
start_datetime = datetime.combine(start_date, start_time)
end_datetime = datetime.combine(end_date, end_time)
# Parameter selection
st.subheader("Data Parameters")
available_params = [
'waveHeight', 'wavePeriod', 'waveDirection',
'windSpeed', 'windDirection', 'airTemperature',
'waterTemperature', 'seaLevel', 'humidity',
'precipitation', 'visibility', 'currentSpeed',
'currentDirection'
]
selected_params = st.multiselect(
"Select Parameters",
available_params,
default=['waveHeight', 'windSpeed', 'airTemperature', 'waterTemperature']
)
if st.button("Get Stormglass Data"):
if not selected_params:
st.warning("Please select at least one parameter.")
else:
with st.spinner("Fetching Stormglass data..."):
stormglass_data = fetch_stormglass_data(
lat,
lng,
start_datetime,
end_datetime,
selected_params
)
# After fetching data, process and display it
if stormglass_data and 'hours' in stormglass_data:
st.success(f"Retrieved {len(stormglass_data['hours'])} hours of data")
# Process data for display
processed_data = []
for hour_data in stormglass_data['hours']:
hour_record = {'time': hour_data['time']}
for param in selected_params:
if param in hour_data:
sources = hour_data[param]
values = [v for k, v in sources.items() if v is not None]
if values:
hour_record[f"{param}_avg"] = sum(values) / len(values)
for source, value in sources.items():
if value is not None:
hour_record[f"{param}_{source}"] = value
processed_data.append(hour_record)
sg_df = pd.DataFrame(processed_data)
sg_df['time'] = pd.to_datetime(sg_df['time'])
# Show raw data in an expander
with st.expander("Show Raw Data"):
st.dataframe(sg_df)
# Create interactive visualizations
for param in selected_params:
avg_col = f"{param}_avg"
if avg_col in sg_df.columns:
param_sources = [col for col in sg_df.columns if col.startswith(f"{param}_") and col != avg_col]
# Main plot with average values
fig = px.line(
sg_df,
x='time',
y=avg_col,
title=f"{param} (Average)",
labels={'time': 'Time', avg_col: param.capitalize()}
)
# Add individual sources to the plot
for source_col in param_sources:
source_name = source_col.replace(f"{param}_", "")
fig.add_scatter(
x=sg_df['time'],
y=sg_df[source_col],
mode='lines',
name=source_name
)
st.plotly_chart(fig, use_container_width=True)
# Display statistics for each parameter
with st.expander("View Data Statistics"):
for param in selected_params:
avg_col = f"{param}_avg"
if avg_col in sg_df.columns:
stats = sg_df[avg_col].describe()
st.write(f"**Statistics for {param.capitalize()}:**")
st.write(f"Minimum: {stats['min']:.2f}")
st.write(f"Maximum: {stats['max']:.2f}")
st.write(f"Mean: {stats['mean']:.2f}")
st.write(f"Median: {stats['50%']:.2f}")
st.write(f"Standard Deviation: {stats['std']:.2f}")
st.markdown("---")
# Additional visualizations
if 'waveHeight_avg' in sg_df.columns and 'windSpeed_avg' in sg_df.columns:
st.subheader("Wave Height vs Wind Speed")
scatter_fig = px.scatter(
sg_df,
x='windSpeed_avg',
y='waveHeight_avg',
title="Correlation: Wave Height vs Wind Speed",
labels={'windSpeed_avg': 'Wind Speed', 'waveHeight_avg': 'Wave Height'}
)
st.plotly_chart(scatter_fig, use_container_width=True)
# Time of day analysis if sufficient data is available
if len(sg_df) >= 24:
st.subheader("Time of Day Analysis")
sg_df['hour'] = sg_df['time'].dt.hour
for param in selected_params:
avg_col = f"{param}_avg"
if avg_col in sg_df.columns:
hourly_avg = sg_df.groupby('hour')[avg_col].mean().reset_index()
hour_fig = px.line(
hourly_avg,
x='hour',
y=avg_col,
title=f"{param.capitalize()} by Hour of Day",
labels={'hour': 'Hour of Day', avg_col: param.capitalize()}
)
st.plotly_chart(hour_fig, use_container_width=True)
elif data_source == "Solunar & Tide Data":
st.header("🌞 Solunar and Tide Information")
st.subheader("Detailed Solunar and Tide Conditions")
# Initialize astro_data to None
astro_data = None
# Location and date/time input
lat = st.number_input("Latitude", value=25.7617, step=0.001, format="%.4f")
lon = st.number_input("Longitude", value=-80.1918, step=0.001, format="%.4f")
input_date = st.date_input("Date", datetime.now())
input_time = st.time_input("Time", datetime.time(datetime.now()))
datetime_input = datetime.combine(input_date, input_time)
if st.button("Retrieve Solunar and Tide Data"):
try:
solunar_tide_data = fetch_solunar_tide_data(lat, lon, datetime_input)
if solunar_tide_data:
# Process tide data
if solunar_tide_data['tide'] and 'data' in solunar_tide_data['tide']:
st.subheader("Tide Times")
tide_df = pd.DataFrame(solunar_tide_data['tide']['data'])
for _, tide in tide_df.iterrows():
st.write(f"**{tide['type'].capitalize()} Tide:**")
tide_time = pd.to_datetime(tide['time'])
st.write(f"Time: {tide_time.strftime('%Y-%m-%d %H:%M:%S')}")
st.write(f"Height: {tide.get('height', 'N/A')} meters")
# Process astronomy data
if solunar_tide_data['astronomy'] and 'data' in solunar_tide_data['astronomy']:
astro_data = solunar_tide_data['astronomy']['data']
st.subheader("Sun and Moon Information")
col1, col2 = st.columns(2)
with col1:
sunrise_time = pd.to_datetime(astro_data.get('sunrise'))
st.metric("Sunrise", sunrise_time.strftime('%H:%M') if sunrise_time else 'N/A')
st.metric("Solar Noon", astro_data.get('solarNoon', 'N/A'))
with col2:
sunset_time = pd.to_datetime(astro_data.get('sunset'))
st.metric("Sunset", sunset_time.strftime('%H:%M') if sunset_time else 'N/A')
st.metric("Day Length", astro_data.get('dayLength', 'N/A'))
# Additional astronomy details
if astro_data is not None:
if 'moonPhase' in astro_data:
st.subheader("Moon Phase Visualization")
moon_phase = astro_data['moonPhase']
fig = go.Figure(go.Indicator(
mode="gauge+number+delta",
value=moon_phase,
title={'text': "Moon Phase"},
delta={'reference': 0.5, 'increasing': {'color': "RebeccaPurple"}},
gauge={
'axis': {'range': [None, 1], 'visible': False},
'bar': {'color': "lightgray"},
'bgcolor': "lightgray",
'borderwidth': 0,
'width': 0.25
}
))
st.plotly_chart(fig, use_container_width=True)
if 'moonAge' in astro_data:
st.write(f"**Moon Age:** {astro_data['moonAge']} days")
# Tide level visualization
if solunar_tide_data['tide'] and 'data' in solunar_tide_data['tide']:
st.subheader("Tide Level Visualization")
tide_df = pd.DataFrame(solunar_tide_data['tide']['data'])
tide_df['time'] = pd.to_datetime(tide_df['time'])
tide_df['height'] = pd.to_numeric(tide_df['height'], errors='coerce')
tide_fig = px.line(
tide_df,
x='time',
y='height',
title="Tide Levels Over Time",
labels={'time': 'Time', 'height': 'Tide Height (m)'}
)
st.plotly_chart(tide_fig, use_container_width=True)
# Handle cases where no data is returned
if not solunar_tide_data or ('tide' not in solunar_tide_data and 'astronomy' not in solunar_tide_data):
st.warning("No solunar or tide data available for the specified date and location.")
except Exception as e:
st.error(f"An error occurred while fetching solunar and tide data: {str(e)}")
logging.error(f"Error in solunar and tide data fetching: {str(e)}")
elif data_source == "Fish Categories Data":
st.header("🐟 Fish Categires Data")
st.warning("Fish Categires Data functionality to be implemented")
# Footer
st.sidebar.markdown("---")
st.sidebar.markdown("🌍 Environmental Data Dashboard")
st.sidebar.markdown("Powered by Streamlit & Open APIs")
# --- RUN THE MAIN APPLICATION ---
if __name__ == "__main__":
main()