import os import sqlite3 import json import time import random import requests import pandas as pd import numpy as np from flask import Flask, render_template, jsonify, request app = Flask(__name__) app.config['SECRET_KEY'] = 'dev-secret-key' app.config['DATABASE'] = os.path.join(app.instance_path, 'maintenance.db') app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max upload size # Ensure instance folder exists try: os.makedirs(app.instance_path) except OSError: pass SILICONFLOW_API_KEY = "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi" SILICONFLOW_API_URL = "https://api.siliconflow.cn/v1/chat/completions" def get_db(): conn = sqlite3.connect(app.config['DATABASE']) conn.row_factory = sqlite3.Row return conn def init_db(): conn = get_db() c = conn.cursor() # Assets Table c.execute(''' CREATE TABLE IF NOT EXISTS assets ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, type TEXT NOT NULL, location TEXT, status TEXT DEFAULT 'operational', health_score INTEGER DEFAULT 100, last_maintenance DATE ) ''') # Anomalies Table c.execute(''' CREATE TABLE IF NOT EXISTS anomalies ( id INTEGER PRIMARY KEY AUTOINCREMENT, asset_id INTEGER, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, metric TEXT, value REAL, threshold REAL, severity TEXT, diagnosis TEXT, recommendation TEXT, status TEXT DEFAULT 'new', FOREIGN KEY (asset_id) REFERENCES assets (id) ) ''') # Seed Data c.execute('SELECT count(*) FROM assets') if c.fetchone()[0] == 0: assets = [ ('CNC-Milling-01', 'CNC Machine', 'Zone A', 'operational', 95, '2025-01-15'), ('Hydraulic-Pump-04', 'Pump', 'Zone B', 'warning', 78, '2024-12-10'), ('Conveyor-Belt-Main', 'Conveyor', 'Zone A', 'operational', 98, '2025-02-01'), ('Robot-Arm-Welder', 'Robot', 'Zone C', 'critical', 45, '2024-11-20') ] c.executemany('INSERT INTO assets (name, type, location, status, health_score, last_maintenance) VALUES (?,?,?,?,?,?)', assets) conn.commit() conn.commit() conn.close() init_db() # --- Helpers --- def call_siliconflow(prompt, system_prompt="You are an expert Industrial AI assistant."): headers = { "Authorization": f"Bearer {SILICONFLOW_API_KEY}", "Content-Type": "application/json" } payload = { "model": "Qwen/Qwen2.5-7B-Instruct", "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": prompt} ], "temperature": 0.7 } try: response = requests.post(SILICONFLOW_API_URL, json=payload, headers=headers, timeout=30) response.raise_for_status() return response.json()['choices'][0]['message']['content'] except Exception as e: print(f"AI Error: {e}") return f"Error generating diagnosis: {str(e)}" # --- Routes --- @app.route('/') def index(): return render_template('index.html') @app.route('/api/assets', methods=['GET']) def get_assets(): conn = get_db() assets = conn.execute('SELECT * FROM assets').fetchall() conn.close() return jsonify([dict(a) for a in assets]) @app.route('/api/sensor_data/', methods=['GET']) def get_sensor_data(asset_id): # Mock sensor data generation based on asset health conn = get_db() asset = conn.execute('SELECT * FROM assets WHERE id = ?', (asset_id,)).fetchone() conn.close() if not asset: return jsonify({'error': 'Asset not found'}), 404 # Generate 50 points now = time.time() timestamps = [] vibration = [] temperature = [] base_vib = 2.0 if asset['health_score'] > 80 else (5.0 if asset['health_score'] > 50 else 8.0) base_temp = 45.0 if asset['health_score'] > 80 else (65.0 if asset['health_score'] > 50 else 85.0) for i in range(50): t = now - (50 - i) * 60 # Past 50 minutes timestamps.append(time.strftime('%H:%M', time.localtime(t))) # Add noise and occasional spikes vib_noise = np.random.normal(0, 0.5) temp_noise = np.random.normal(0, 2.0) v = base_vib + vib_noise tm = base_temp + temp_noise if asset['status'] in ['warning', 'critical'] and i > 40: # Recent spike v += random.uniform(2.0, 5.0) tm += random.uniform(5.0, 10.0) vibration.append(round(max(0, v), 2)) temperature.append(round(max(20, tm), 2)) return jsonify({ 'timestamps': timestamps, 'vibration': vibration, 'temperature': temperature, 'asset_name': asset['name'] }) @app.route('/api/upload', methods=['POST']) def upload_file(): if 'file' not in request.files: return jsonify({'error': 'No file part'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'No selected file'}), 400 if file: try: # Simple mock processing of uploaded file (e.g. historical data) # In a real app, we would parse CSV/JSON and update DB filename = file.filename return jsonify({'status': 'success', 'message': f'File {filename} uploaded and processed successfully'}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/anomalies', methods=['GET', 'POST']) def handle_anomalies(): conn = get_db() if request.method == 'POST': data = request.json conn.execute(''' INSERT INTO anomalies (asset_id, metric, value, threshold, severity, status) VALUES (?, ?, ?, ?, ?, 'new') ''', (data['asset_id'], data['metric'], data['value'], data['threshold'], data['severity'])) conn.commit() conn.close() return jsonify({'status': 'recorded'}) else: anomalies = conn.execute(''' SELECT a.*, asst.name as asset_name FROM anomalies a JOIN assets asst ON a.asset_id = asst.id ORDER BY a.timestamp DESC ''').fetchall() conn.close() return jsonify([dict(a) for a in anomalies]) @app.route('/api/diagnose', methods=['POST']) def diagnose_anomaly(): data = request.json anomaly_id = data.get('id') asset_context = data.get('context', {}) # Construct prompt prompt = f""" Analyze the following industrial equipment anomaly and provide a diagnosis and maintenance recommendation. Asset: {asset_context.get('name')} ({asset_context.get('type')}) Metric: {asset_context.get('metric')} Current Value: {asset_context.get('value')} Threshold: {asset_context.get('threshold')} Severity: {asset_context.get('severity')} Please provide: 1. Potential Root Cause (Diagnosis) 2. Recommended Action (Maintenance Plan) 3. Estimated Urgency (High/Medium/Low) Return the response in JSON format with keys: 'diagnosis', 'recommendation', 'urgency'. ENSURE THE VALUES ARE IN CHINESE (SIMPLIFIED). """ system_prompt = "You are an expert Industrial Maintenance Engineer AI. Output strictly valid JSON. Use Chinese for diagnosis and recommendation." ai_response = call_siliconflow(prompt, system_prompt) # Parse JSON from AI (cleanup if needed) try: # Strip code blocks if present if "```json" in ai_response: ai_response = ai_response.split("```json")[1].split("```")[0].strip() elif "```" in ai_response: ai_response = ai_response.split("```")[1].split("```")[0].strip() result = json.loads(ai_response) # Update DB if anomaly_id: conn = get_db() conn.execute(''' UPDATE anomalies SET diagnosis = ?, recommendation = ?, status = 'diagnosed' WHERE id = ? ''', (result.get('diagnosis'), result.get('recommendation'), anomaly_id)) conn.commit() conn.close() return jsonify(result) except Exception as e: print(f"Parse Error: {e}, Response: {ai_response}") return jsonify({'error': 'Failed to parse AI response', 'raw': ai_response}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=True)