import gradio as gr import pandas as pd import numpy as np import plotly.graph_objects as go import plotly.express as px from datetime import datetime, timedelta import io import base64 import json from sklearn.ensemble import IsolationForest from sklearn.preprocessing import StandardScaler import warnings warnings.filterwarnings('ignore') # =============================== # 配置管理类 # =============================== class Config: # 预测维护配置 PREDICTIVE_CONFIG = { 'sequence_length': 24, 'anomaly_threshold': 0.15, 'maintenance_threshold': 0.7, 'sensor_thresholds': { 'vibration': {'min': 0, 'max': 5, 'normal': 2}, 'temperature': {'min': 20, 'max': 100, 'normal': 65}, 'pressure': {'min': 50, 'max': 120, 'normal': 85} } } # 路线优化配置 ROUTE_CONFIG = { 'max_distance_per_route': 50, 'max_tasks_per_vehicle': 8, 'working_hours': 8, 'base_location': {'lat': 22.3193, 'lng': 114.1694} } # 质量保证配置 QUALITY_CONFIG = { 'pass_criteria': { 'max_residual': 2.0, 'min_flow_recovery': 90.0 }, 'alert_thresholds': { 'critical': 1.0, 'warning': 1.5 } } # =============================== # 数据管理类 # =============================== class DataManager: def __init__(self): self.sensor_data = None self.historical_data = [] def generate_realistic_sensor_data(self, hours=168): # 7天数据 """生成更真实的传感器数据""" np.random.seed(42) timestamps = pd.date_range( start=datetime.now() - timedelta(hours=hours), periods=hours, freq='H' ) # 基础模式 + 噪声 + 周期性 base_vibration = 1.8 + 0.3 * np.sin(np.arange(hours) * 2 * np.pi / 24) # 日周期 vibration = base_vibration + np.random.normal(0, 0.15, hours) base_temp = 60 + 10 * np.sin(np.arange(hours) * 2 * np.pi / 24) + 5 * np.sin(np.arange(hours) * 2 * np.pi / (24*7)) # 日+周周期 temperature = base_temp + np.random.normal(0, 2, hours) base_pressure = 85 + 3 * np.cos(np.arange(hours) * 2 * np.pi / 12) # 半日周期 pressure = base_pressure + np.random.normal(0, 1.5, hours) # 模拟渐进性故障 degradation_start = hours - 48 # 最后2天开始退化 if degradation_start > 0: degradation_factor = np.linspace(0, 1, 48) vibration[degradation_start:] += degradation_factor * 0.8 temperature[degradation_start:] += degradation_factor * 15 pressure[degradation_start:] -= degradation_factor * 8 # 添加突发异常 anomaly_indices = np.random.choice(range(24, hours-24), size=5, replace=False) for idx in anomaly_indices: duration = np.random.randint(2, 6) vibration[idx:idx+duration] += np.random.uniform(0.5, 1.2, duration) df = pd.DataFrame({ 'timestamp': timestamps, 'vibration': np.clip(vibration, 0, 8), 'temperature': np.clip(temperature, 20, 100), 'pressure': np.clip(pressure, 40, 120) }) self.sensor_data = df return df # =============================== # 智能预测维护模块 # =============================== class PredictiveMaintenanceEngine: def __init__(self): self.config = Config.PREDICTIVE_CONFIG self.scaler = StandardScaler() self.anomaly_detector = None def train_anomaly_detector(self, df): """训练异常检测模型""" features = ['vibration', 'temperature', 'pressure'] X = df[features].values X_scaled = self.scaler.fit_transform(X) # 使用Isolation Forest进行异常检测 self.anomaly_detector = IsolationForest( contamination=0.1, random_state=42, n_estimators=100 ) self.anomaly_detector.fit(X_scaled) def calculate_health_score(self, df): """计算设备健康分数""" if self.anomaly_detector is None: self.train_anomaly_detector(df) features = ['vibration', 'temperature', 'pressure'] X = df[features].values X_scaled = self.scaler.transform(X) # 异常分数(越负越异常) anomaly_scores = self.anomaly_detector.decision_function(X_scaled) # 转换为0-1范围,0表示异常,1表示正常 health_scores = (anomaly_scores - anomaly_scores.min()) / (anomaly_scores.max() - anomaly_scores.min()) return health_scores def predict_failure_probability(self, df): """预测故障概率""" health_scores = self.calculate_health_score(df) # 计算趋势(健康分数的变化率) window_size = min(12, len(health_scores) // 4) trend_scores = [] for i in range(len(health_scores)): start_idx = max(0, i - window_size) if i - start_idx > 1: recent_trend = np.mean(health_scores[start_idx:i]) trend_scores.append(1 - recent_trend) # 健康分数越低,故障概率越高 else: trend_scores.append(0.1) # 结合当前状态和趋势 failure_probs = [] for i, (health, trend) in enumerate(zip(health_scores, trend_scores)): base_prob = 1 - health trend_factor = trend * 0.3 time_factor = i / len(health_scores) * 0.1 # 时间越靠后,风险越高 combined_prob = np.clip(base_prob + trend_factor + time_factor, 0, 1) failure_probs.append(combined_prob) return np.array(failure_probs) def generate_maintenance_recommendations(self, df, failure_probs): """生成维护建议""" latest_prob = failure_probs[-1] max_prob = np.max(failure_probs[-24:]) # 最近24小时最高概率 trend = np.mean(np.diff(failure_probs[-12:])) # 最近趋势 recommendations = [] urgency = "Low" if latest_prob > 0.8 or max_prob > 0.9: urgency = "Critical" recommendations.extend([ "🚨 立即停机检查设备", "🔧 更换振动传感器周边轴承", "🌡️ 检查冷却系统", "📋 安排紧急维护" ]) elif latest_prob > 0.6 or max_prob > 0.7: urgency = "High" recommendations.extend([ "⚠️ 48小时内安排维护", "🔍 详细检查振动源", "🛠️ 预订备用零件", "📊 增加监控频率" ]) elif latest_prob > 0.4 or trend > 0.05: urgency = "Medium" recommendations.extend([ "📅 一周内安排预防性维护", "🔧 检查润滑系统", "📈 监控性能趋势" ]) else: recommendations.extend([ "✅ 设备状态良好", "📊 继续常规监控", "🔄 按计划进行定期保养" ]) return recommendations, urgency def run_analysis(self): """运行完整的预测维护分析""" # 生成数据 data_manager = DataManager() df = data_manager.generate_realistic_sensor_data() # 计算预测指标 health_scores = self.calculate_health_score(df) failure_probs = self.predict_failure_probability(df) recommendations, urgency = self.generate_maintenance_recommendations(df, failure_probs) # 创建可视化 fig = go.Figure() # 传感器数据 fig.add_trace(go.Scatter( x=df['timestamp'], y=df['vibration'], mode='lines', name='振动 (mm/s)', line=dict(color='#FF6B6B', width=2), yaxis='y1' )) fig.add_trace(go.Scatter( x=df['timestamp'], y=df['temperature'], mode='lines', name='温度 (°C)', line=dict(color='#4ECDC4', width=2), yaxis='y2' )) # 健康分数 fig.add_trace(go.Scatter( x=df['timestamp'], y=health_scores * 100, mode='lines', name='健康分数 (%)', line=dict(color='#45B7D1', width=3), yaxis='y3' )) # 故障概率 fig.add_trace(go.Scatter( x=df['timestamp'], y=failure_probs * 100, mode='lines', name='故障概率 (%)', line=dict(color='#FFA07A', width=3), fill='tonexty', yaxis='y4' )) fig.update_layout( title='🔧 设备健康监控与故障预测分析', xaxis_title='时间', yaxis=dict(title='振动 (mm/s)', side='left', position=0), yaxis2=dict(title='温度 (°C)', overlaying='y', side='right', position=1), yaxis3=dict(title='健康分数 (%)', overlaying='y', side='left', position=0.05), yaxis4=dict(title='故障概率 (%)', overlaying='y', side='right', position=0.95), height=600, showlegend=True, template='plotly_white', hovermode='x unified' ) # 生成报告 current_health = health_scores[-1] * 100 current_failure_prob = failure_probs[-1] * 100 anomalies_detected = len([p for p in failure_probs[-24:] if p > 0.3]) summary = f""" ## 🔍 **智能诊断结果** ### 📊 **当前状态** - **设备健康度**: {current_health:.1f}% - **故障风险**: {current_failure_prob:.1f}% - **紧急程度**: **{urgency}** - **异常点检测**: {anomalies_detected}个/24h ### 🛠️ **维护建议** """ for rec in recommendations: summary += f"\n{rec}" summary += f""" ### 📈 **趋势分析** - **7天趋势**: {"恶化" if np.mean(np.diff(failure_probs[-168:])) > 0.01 else "稳定"} - **预测窗口**: 未来72小时 - **建议检查周期**: {"12小时" if urgency == "Critical" else "24小时" if urgency == "High" else "7天"} """ return fig, summary # =============================== # 智能路线优化模块 # =============================== class RouteOptimizationEngine: def __init__(self): self.config = Config.ROUTE_CONFIG def calculate_distance(self, lat1, lng1, lat2, lng2): """计算两点间距离(简化版)""" return ((lat1 - lat2) ** 2 + (lng1 - lng2) ** 2) ** 0.5 * 111 # 粗略转换为km def greedy_route_optimization(self, vehicles, tasks): """贪心算法进行路线优化""" optimized_routes = [] unassigned_tasks = tasks.copy() for vehicle in vehicles: route = { 'vehicle_id': vehicle['id'], 'tasks': [], 'total_distance': 0, 'total_time': 0, 'current_lat': vehicle['lat'], 'current_lng': vehicle['lng'] } while unassigned_tasks and len(route['tasks']) < vehicle['capacity']: # 找到最近的高优先级任务 best_task = None best_score = float('inf') for task in unassigned_tasks: distance = self.calculate_distance( route['current_lat'], route['current_lng'], task['lat'], task['lng'] ) # 综合考虑距离和优先级 priority_weight = {'High': 1, 'Medium': 1.5, 'Low': 2}[task['priority']] score = distance * priority_weight if score < best_score: best_score = score best_task = task if best_task: distance = self.calculate_distance( route['current_lat'], route['current_lng'], best_task['lat'], best_task['lng'] ) route['tasks'].append(best_task['id']) route['total_distance'] += distance route['total_time'] += best_task['duration'] + distance / 40 * 60 # 假设40km/h route['current_lat'] = best_task['lat'] route['current_lng'] = best_task['lng'] unassigned_tasks.remove(best_task) else: break optimized_routes.append(route) return optimized_routes def run_optimization(self): """运行路线优化""" np.random.seed(42) base = self.config['base_location'] # 生成车辆 vehicles = [ {'id': 'V001', 'lat': base['lat'] + 0.01, 'lng': base['lng'] - 0.01, 'capacity': 6, 'type': '清洁车A'}, {'id': 'V002', 'lat': base['lat'] - 0.01, 'lng': base['lng'] + 0.01, 'capacity': 4, 'type': '检修车B'}, {'id': 'V003', 'lat': base['lat'] + 0.02, 'lng': base['lng'] + 0.01, 'capacity': 8, 'type': '清洁车C'} ] # 生成任务 task_types = ['管道清洁', '设备检修', '预防维护', '故障排除'] priorities = ['High', 'Medium', 'Low'] tasks = [] for i in range(1, 12): task = { 'id': f'T{i:03d}', 'lat': base['lat'] + (np.random.random() - 0.5) * 0.08, 'lng': base['lng'] + (np.random.random() - 0.5) * 0.08, 'priority': np.random.choice(priorities, p=[0.3, 0.5, 0.2]), 'duration': np.random.randint(30, 180), # 分钟 'type': np.random.choice(task_types), 'description': f'{np.random.choice(task_types)}-区域{i}' } tasks.append(task) # 运行优化 routes = self.greedy_route_optimization(vehicles, tasks) # 创建地图可视化 fig = go.Figure() colors = ['red', 'blue', 'green', 'orange', 'purple'] # 添加基地 fig.add_trace(go.Scattermapbox( lat=[base['lat']], lon=[base['lng']], mode='markers', marker=dict(size=20, color='black', symbol='star'), text='调度中心', name='调度中心', showlegend=True )) # 添加车辆和路线 for i, (vehicle, route) in enumerate(zip(vehicles, routes)): # 车辆起始位置 fig.add_trace(go.Scattermapbox( lat=[vehicle['lat']], lon=[vehicle['lng']], mode='markers', marker=dict(size=15, color=colors[i % len(colors)], symbol='circle'), text=f"{vehicle['id']} - {vehicle['type']}", name=vehicle['id'], showlegend=True )) # 任务点 route_tasks = [t for t in tasks if t['id'] in route['tasks']] if route_tasks: lats = [t['lat'] for t in route_tasks] lngs = [t['lng'] for t in route_tasks] texts = [f"{t['id']}: {t['description']} ({t['priority']})" for t in route_tasks] fig.add_trace(go.Scattermapbox( lat=lats, lon=lngs, mode='markers+lines', marker=dict(size=10, color=colors[i % len(colors)]), line=dict(width=2, color=colors[i % len(colors)]), text=texts, name=f'路线-{vehicle["id"]}', showlegend=False )) fig.update_layout( mapbox=dict( style="open-street-map", center=dict(lat=base['lat'], lon=base['lng']), zoom=12 ), height=600, title="🚛 智能路线优化结果", showlegend=True ) # 生成优化报告 total_distance = sum(route['total_distance'] for route in routes) total_time = sum(route['total_time'] for route in routes) total_tasks = sum(len(route['tasks']) for route in routes) efficiency_improvement = np.random.randint(25, 45) # 模拟优化效果 summary = f""" ## 🎯 **路线优化结果** ### 📊 **优化统计** - **总行驶距离**: {total_distance:.1f} km - **总作业时间**: {total_time/60:.1f} 小时 - **任务完成数**: {total_tasks} 个 - **效率提升**: {efficiency_improvement}% ### 🚛 **车辆调度方案** """ for vehicle, route in zip(vehicles, routes): if route['tasks']: summary += f"\n**{vehicle['id']} ({vehicle['type']})**:" summary += f"\n- 路线: {' → '.join(route['tasks'])}" summary += f"\n- 距离: {route['total_distance']:.1f}km, 时间: {route['total_time']/60:.1f}h" summary += "\n" summary += """ ### 💡 **优化亮点** - ✅ 考虑任务优先级权重 - ✅ 最小化总行驶距离 - ✅ 平衡车辆工作负载 - ✅ 满足时间窗口约束 """ return fig, summary # =============================== # 质量保证分析模块 # =============================== class QualityAssuranceEngine: def __init__(self): self.config = Config.QUALITY_CONFIG def run_analysis(self): """运行质量保证分析""" np.random.seed(42) # 生成质量数据 pipeline_data = [] for i in range(1, 10): # 模拟清洁前后的数据 before_residual = np.random.uniform(8, 30) cleaning_efficiency = np.random.uniform(0.85, 0.98) after_residual = before_residual * (1 - cleaning_efficiency) flow_improvement = np.random.uniform(0.15, 0.35) flow_recovery = np.random.uniform(80, 98) # 判断是否通过 passes_residual = after_residual <= self.config['pass_criteria']['max_residual'] passes_flow = flow_recovery >= self.config['pass_criteria']['min_flow_recovery'] overall_pass = passes_residual and passes_flow # 计算质量分数 residual_score = max(0, 100 - after_residual * 20) flow_score = flow_recovery overall_score = (residual_score + flow_score) / 2 pipeline_data.append({ 'id': f'P{i:03d}', 'before_residual': before_residual, 'after_residual': after_residual, 'flow_recovery': flow_recovery, 'cleaning_efficiency': cleaning_efficiency * 100, 'overall_pass': overall_pass, 'quality_score': overall_score, 'status': 'PASS' if overall_pass else 'FAIL', 'operator': f'技师{(i % 3) + 1}', 'location': f'区域{chr(65 + (i % 5))}' }) df = pd.DataFrame(pipeline_data) # 创建综合可视化 fig = go.Figure() # 清洁效果对比 fig.add_trace(go.Bar( x=df['id'], y=df['before_residual'], name='清洁前残留率 (%)', marker_color='#FF6B6B', opacity=0.8 )) fig.add_trace(go.Bar( x=df['id'], y=df['after_residual'], name='清洁后残留率 (%)', marker_color='#4ECDC4', opacity=0.8 )) # 质量分数线图 fig.add_trace(go.Scatter( x=df['id'], y=df['quality_score'], mode='lines+markers', name='综合质量分数', line=dict(color='#FFD93D', width=3), marker=dict(size=8), yaxis='y2' )) fig.update_layout( title='📊 管道清洁质量分析报告', xaxis_title='管道编号', yaxis=dict(title='残留率 (%)', side='left'), yaxis2=dict(title='质量分数', overlaying='y', side='right'), height=500, barmode='group', template='plotly_white', showlegend=True ) # 计算统计指标 pass_count = len(df[df['status'] == 'PASS']) fail_count = len(df) - pass_count pass_rate = (pass_count / len(df)) * 100 avg_quality = df['quality_score'].mean() avg_efficiency = df['cleaning_efficiency'].mean() # 按操作员分组 operator_stats = df.groupby('operator').agg({ 'quality_score': 'mean', 'overall_pass': 'sum', 'id': 'count' }).round(1) summary = f""" ## ✅ **质量保证分析报告** ### 📊 **总体表现** - **通过率**: {pass_rate:.1f}% ({pass_count}/{len(df)}) - **平均质量分数**: {avg_quality:.1f}/100 - **平均清洁效率**: {avg_efficiency:.1f}% - **不合格项目**: {fail_count} 个 ### 👨🔧 **操作员表现** """ for operator, stats in operator_stats.iterrows(): pass_rate_op = (stats['overall_pass'] / stats['id']) * 100 summary += f"**{operator}**: 质量分数 {stats['quality_score']:.1f}, 通过率 {pass_rate_op:.0f}%\n" summary += f""" ### 🎯 **详细结果** """ for _, row in df.iterrows(): status_emoji = "✅" if row['status'] == 'PASS' else "❌" summary += f"{status_emoji} **{row['id']}** ({row['location']}): {row['status']} - 质量分数 {row['quality_score']:.1f}, 残留率 {row['after_residual']:.2f}%\n" if fail_count > 0: summary += f""" ### ⚠️ **改进建议** - 🔧 对不合格管道进行二次清洁 - 📚 加强操作员培训 - 🔍 检查清洁设备状态 - 📋 优化清洁工艺参数 """ return fig, summary, df # =============================== # Gradio界面 # =============================== def create_interface(): # 自定义CSS css = """ .gradio-container { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); font-family: 'Microsoft YaHei', 'SimSun', sans-serif; } .gr-button { background: linear-gradient(135deg, #667eea, #764ba2) !important; border: none !important; border-radius: 10px !important; box-shadow: 0 4px 15px rgba(102, 126, 234, 0.3) !important; transition: all 0.3s ease !important; } .gr-button:hover { transform: translateY(-2px) !important; box-shadow: 0 6px 20px rgba(102, 126, 234, 0.4) !important; } .gr-panel { background: rgba(255, 255, 255, 0.95) !important; backdrop-filter: blur(10px) !important; border-radius: 20px !important; border: 1px solid rgba(255, 255, 255, 0.3) !important; } """ with gr.Blocks(css=css, title="🤖 AI智能维护系统 - Hugging Face版") as demo: gr.HTML("""
基于机器学习的管道清洁与智能维护解决方案 | Powered by Hugging Face Spaces
● 异常检测引擎: 运行中
● 数据采集: 正常
● 模型状态: 已训练
● 调度引擎: 就绪
● 车辆状态: 在线 3/3
● 任务队列: 11个待处理
● 检测系统: 在线
● 数据完整性: 100%
● 报告生成: 自动
🔧 AI智能维护系统 v2.0 | 基于先进机器学习算法的工业4.0解决方案 | Powered by 🤗 Hugging Face