pranit144's picture
Upload 6 files
1c482cd verified
from flask import Flask, request, render_template, jsonify, redirect, url_for, flash
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
import json
import io
import base64
import os
import logging
from logging.handlers import RotatingFileHandler
from werkzeug.middleware.proxy_fix import ProxyFix
app = Flask(__name__)
app.secret_key = 'your_secret_key_here'
# Create upload folder and logs folder
UPLOAD_FOLDER = 'uploads'
LOGS_FOLDER = 'logs'
for folder in [UPLOAD_FOLDER, LOGS_FOLDER]:
if not os.path.exists(folder):
os.makedirs(folder)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1, x_host=1)
# Setup logging
if not app.debug:
file_handler = RotatingFileHandler('logs/maintenance_dashboard.log', maxBytes=10240, backupCount=10)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'
))
file_handler.setLevel(logging.INFO)
app.logger.addHandler(file_handler)
app.logger.setLevel(logging.INFO)
app.logger.info('Maintenance Dashboard startup')
class DataStore:
def __init__(self):
self.uploaded_data = None
self.current_index = 0
self.processed_data = []
self.anomalies = []
data_store = DataStore()
def detect_anomalies(df):
"""Checks each row of the dataframe for readings that exceed preset thresholds."""
alerts = []
thresholds = {
'brakes': 90,
'filters':90,
'cables': 90
}
for idx, row in df.iterrows():
row_alerts = []
for component, threshold in thresholds.items():
if component in df.columns:
value = row[component]
if pd.notna(value) and value > threshold:
row_alerts.append(
f"{component.capitalize()} reading ({value}) exceeds threshold ({threshold})"
)
if row_alerts:
alerts.append({
'row': idx + 1,
'messages': row_alerts
})
return alerts
def analyze_component_trends(df):
"""Analyze trends and patterns in component data"""
trends = {}
for component in ['brakes', 'filters', 'cables']:
# Calculate rolling average to identify trends
rolling_avg = df[component].rolling(window=3).mean()
# Calculate rate of change
rate_of_change = df[component].diff().mean()
# Identify peak usage periods
peak_threshold = df[component].quantile(0.75)
peak_periods = df[component] > peak_threshold
trends[component] = {
'trend': 'Increasing' if rate_of_change > 1 else 'Decreasing' if rate_of_change < -1 else 'Stable',
'rate_of_change': round(rate_of_change, 2),
'peak_usage_frequency': round((peak_periods.sum() / len(df)) * 100, 1),
'recent_trend': 'Up' if rolling_avg.iloc[-1] > rolling_avg.iloc[-2] else 'Down'
}
return trends
def generate_maintenance_insights(df, trends):
"""Generate detailed maintenance insights based on data analysis"""
insights = {
'critical_analysis': [],
'maintenance_recommendations': [],
'preventive_measures': [],
'optimization_suggestions': []
}
for component in ['brakes', 'filters', 'cables']:
avg = df[component].mean()
max_val = df[component].max()
std_dev = df[component].std()
trend = trends[component]
# Critical Analysis
if avg > 70:
insights['critical_analysis'].append({
'component': component,
'severity': 'High' if avg > 80 else 'Medium',
'reason': f"Sustained high readings (avg: {round(avg, 1)}%)",
'trend': trend['trend'],
'impact': 'Immediate attention required' if avg > 80 else 'Monitor closely'
})
# Maintenance Recommendations
if trend['trend'] == 'Increasing' and avg > 60:
insights['maintenance_recommendations'].append({
'component': component,
'urgency': 'High' if avg > 75 else 'Medium',
'action': f"Schedule maintenance within {' 24 hours' if avg > 75 else ' one week'}",
'reason': f"Increasing trend with high average ({round(avg, 1)}%)"
})
# Preventive Measures
if std_dev > 10 or trend['peak_usage_frequency'] > 30:
insights['preventive_measures'].append({
'component': component,
'measure': f"Implement regular checks for {component}",
'frequency': 'Daily' if std_dev > 15 else 'Weekly',
'reason': f"High variability (±{round(std_dev, 1)}%) and frequent peak usage ({trend['peak_usage_frequency']}% of time)"
})
# Optimization Suggestions
if trend['rate_of_change'] > 2 or max_val > 90:
insights['optimization_suggestions'].append({
'component': component,
'suggestion': f"Review {component} usage patterns",
'potential_impact': 'High',
'expected_benefit': 'Reduced wear and extended component life'
})
return insights
def calculate_statistics(df):
"""Calculate important statistics from the data"""
stats = {
'component_averages': {
'brakes': round(df['brakes'].mean(), 2),
'filters': round(df['filters'].mean(), 2),
'cables': round(df['cables'].mean(), 2)
},
'critical_components': [],
'maintenance_suggestions': [],
'component_health': {}, # New: Component health status
'maintenance_priority': [], # New: Prioritized maintenance list
'performance_metrics': {}, # New: Detailed performance metrics
'detailed_analysis': {} # New: Detailed analysis
}
# Calculate component health and status
for component in ['brakes', 'filters', 'cables']:
avg = df[component].mean()
max_val = df[component].max()
min_val = df[component].min()
std_dev = df[component].std()
# Calculate health score (0-100)
health_score = max(0, min(100, 100 - (avg / 100 * 100)))
stats['component_health'][component] = {
'health_score': round(health_score, 1),
'average': round(avg, 2),
'max_reading': round(max_val, 2),
'min_reading': round(min_val, 2),
'variability': round(std_dev, 2),
'status': 'Good' if avg < 60 else 'Warning' if avg < 75 else 'Critical'
}
# Identify critical components
if avg > 70:
stats['critical_components'].append({
'name': component,
'avg_value': round(avg, 2),
'max_value': round(max_val, 2),
'health_score': round(health_score, 1),
'status': 'Critical' if avg > 80 else 'Warning',
'variability': round(std_dev, 2)
})
# Generate prioritized maintenance suggestions
for component, health in stats['component_health'].items():
if health['average'] > 80:
stats['maintenance_priority'].append({
'component': component,
'priority': 'High',
'timeline': 'Immediate',
'reason': f"Critical readings (Avg: {health['average']}%)",
'recommendation': f"Schedule immediate maintenance for {component}"
})
elif health['average'] > 70:
stats['maintenance_priority'].append({
'component': component,
'priority': 'Medium',
'timeline': 'Within 1 week',
'reason': f"Warning levels (Avg: {health['average']}%)",
'recommendation': f"Plan maintenance for {component} soon"
})
elif health['variability'] > 10:
stats['maintenance_priority'].append({
'component': component,
'priority': 'Low',
'timeline': 'Monitor',
'reason': f"High variability (±{health['variability']}%)",
'recommendation': f"Monitor {component} performance"
})
# Generate detailed maintenance suggestions
for component, health in stats['component_health'].items():
suggestions = []
if health['average'] > 80:
suggestions.append(f"URGENT: Immediate maintenance required - {component} showing critical wear")
elif health['average'] > 70:
suggestions.append(f"WARNING: Schedule maintenance soon - {component} performance degrading")
if health['variability'] > 10:
suggestions.append(f"Monitor {component} - Showing inconsistent readings (±{health['variability']}%)")
if health['max_reading'] > 90:
suggestions.append(f"Investigate {component} peak readings of {health['max_reading']}%")
if suggestions:
stats['maintenance_suggestions'].extend(suggestions)
# Calculate performance metrics
stats['performance_metrics'] = {
'overall_health': round(sum(h['health_score'] for h in stats['component_health'].values()) / 3, 1),
'critical_count': len([h for h in stats['component_health'].values() if h['status'] == 'Critical']),
'warning_count': len([h for h in stats['component_health'].values() if h['status'] == 'Warning']),
'healthy_count': len([h for h in stats['component_health'].values() if h['status'] == 'Good'])
}
# Add new detailed analyses
trends = analyze_component_trends(df)
maintenance_insights = generate_maintenance_insights(df, trends)
stats['detailed_analysis'] = {
'trends': trends,
'insights': maintenance_insights
}
return stats
def create_graphs(df):
"""Create all visualization graphs"""
graphs = {}
# Gauge Charts for all components
components = ['brakes', 'filters', 'cables']
for component in components:
latest_value = df[component].iloc[-1]
gauge = go.Figure(go.Indicator(
mode="gauge+number+delta",
value=latest_value,
delta={'reference': df[component].iloc[-2] if len(df) > 1 else latest_value},
domain={'x': [0, 1], 'y': [0, 1]},
title={'text': f"Latest {component.title()} Reading"},
gauge={
'axis': {'range': [None, 100]},
'bar': {'color': "#1f77b4"},
'threshold': {
'line': {'color': "red", 'width': 4},
'thickness': 0.75,
'value': 90
},
'steps': [
{'range': [0, 60], 'color': "#3feb48"},
{'range': [60, 80], 'color': "#ebeb3f"},
{'range': [80, 100], 'color': "#eb3f3f"}
]
}))
gauge.update_layout(height=300)
graphs[f'{component}_gauge'] = gauge.to_html(full_html=False)
# Bar Chart for current readings with historical average
current_values = [df[comp].iloc[-1] for comp in components]
avg_values = [df[comp].mean() for comp in components]
bar = go.Figure(data=[
go.Bar(name='Current Reading', x=components, y=current_values),
go.Bar(name='Historical Average', x=components, y=avg_values)
])
bar.update_layout(
title="Component Readings Comparison",
barmode='group',
height=400
)
graphs['bar'] = bar.to_html(full_html=False)
# Time Series Chart with Moving Average
fig_time = go.Figure()
for component in components:
# Add raw data
fig_time.add_trace(go.Scatter(
y=df[component],
name=component.title(),
mode='lines'
))
# Add moving average
ma = df[component].rolling(window=3).mean()
fig_time.add_trace(go.Scatter(
y=ma,
name=f"{component.title()} MA",
line=dict(dash='dash'),
opacity=0.5
))
fig_time.update_layout(
title='Component Readings Over Time',
height=400,
showlegend=True,
legend=dict(
orientation="h",
yanchor="bottom",
y=1.02,
xanchor="right",
x=1
)
)
graphs['timeseries'] = fig_time.to_html(full_html=False)
# Correlation Matrix Heatmap
corr_matrix = df[components].corr()
heatmap = go.Figure(data=go.Heatmap(
z=corr_matrix,
x=components,
y=components,
colorscale='RdBu',
zmin=-1,
zmax=1
))
heatmap.update_layout(
title='Component Correlation Matrix',
height=400
)
graphs['heatmap'] = heatmap.to_html(full_html=False)
# Box Plot for Distribution Analysis
box_data = [go.Box(y=df[component], name=component.title()) for component in components]
box_plot = go.Figure(data=box_data)
box_plot.update_layout(
title='Component Value Distributions',
height=400
)
graphs['box_plot'] = box_plot.to_html(full_html=False)
# Scatter Matrix
scatter_matrix = px.scatter_matrix(
df[components],
dimensions=components,
title='Component Relationships Matrix'
)
scatter_matrix.update_layout(height=600)
graphs['scatter_matrix'] = scatter_matrix.to_html(full_html=False)
return graphs
@app.route('/')
def index():
return render_template('index.html',
data=None,
graphs=None,
stats=None,
anomalies=None)
@app.route('/upload', methods=['POST'])
def upload():
if 'file' not in request.files:
flash('No file uploaded', 'error')
return redirect(url_for('index'))
file = request.files['file']
if file.filename == '':
flash('No file selected', 'error')
return redirect(url_for('index'))
try:
# Read the file
if file.filename.endswith('.csv'):
df = pd.read_csv(file)
elif file.filename.endswith(('.xlsx', '.xls')):
df = pd.read_excel(file)
else:
flash('Unsupported file format. Please upload CSV or Excel file.', 'error')
return redirect(url_for('index'))
# Validate required columns
required_columns = ['brakes', 'filters', 'cables']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
flash(f"Missing required columns: {', '.join(missing_columns)}", 'error')
return redirect(url_for('index'))
# Store the data and process it
data_store.uploaded_data = df
data_store.anomalies = detect_anomalies(df)
# Calculate statistics
stats = calculate_statistics(df)
# Create graphs
graphs = create_graphs(df)
# Render template with all the data
return render_template('index.html',
data=df.to_dict('records'),
graphs=graphs,
stats=stats,
anomalies=data_store.anomalies)
except Exception as e:
flash(f"Error processing file: {str(e)}", 'error')
return redirect(url_for('index'))
if __name__ == '__main__':
port = int(os.environ.get('PORT', 5000))
app.run(host='0.0.0.0', port=port)