import os import time import threading import json import random import subprocess import sys from datetime import datetime from flask import Flask, jsonify, request, render_template, Response, send_from_directory from flask_sqlalchemy import SQLAlchemy from flask_cors import CORS from concurrent.futures import ThreadPoolExecutor # Import local modules from backend folder from payload_preparation.main import PayloadPreparer from forecast_engine.main import ForecastEngine from dotenv import load_dotenv from supabase import create_client, Client load_dotenv() # ========================================== # APP CONFIGURATION # ========================================== # Frontend is located at the root level, relative to the backend folder frontend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'frontend')) templates_dir = os.path.join(frontend_dir, 'templates') components_dir = os.path.join(templates_dir, 'components') static_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'static')) app = Flask(__name__, template_folder=templates_dir, static_folder=static_dir, static_url_path='/static') CORS(app) # ... removed from here ... basedir = os.path.abspath(os.path.dirname(__file__)) db_dir = os.path.join(basedir, 'database') if not os.path.exists(db_dir): os.makedirs(db_dir) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(db_dir, 'smart_grid.db') app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_ENGINE_OPTIONS'] = { "connect_args": {"timeout": 30}, "pool_pre_ping": True, } db = SQLAlchemy(app) # Enable WAL mode for SQLite to handle concurrent writes with app.app_context(): from sqlalchemy import text db.session.execute(text("PRAGMA journal_mode=WAL")) db.session.commit() # Shared in-memory state _state_lock = threading.Lock() LATEST_GRID_PAYLOAD = {} LATEST_FORECAST_PAYLOAD = {} # Supabase Configuration SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_KEY = os.getenv("SUPABASE_KEY") supabase: Client = None if SUPABASE_URL and SUPABASE_KEY: try: supabase = create_client(SUPABASE_URL, SUPABASE_KEY) print(f"[SUPABASE] Successfully initialized connection to {SUPABASE_URL}") except Exception as e: print(f"[SUPABASE] Initialization failed: {e}") else: print("[SUPABASE] No cloud credentials found in .env. Falling back to local SQLite only.") def sync_to_supabase(table_name, data): """Utility to mirror local database records to Supabase.""" if not supabase: return try: # Supabase expects a list of dicts for bulk insert/upsert if isinstance(data, dict): data = [data] # Handle datetime serialization processed_data = [] for row in data: new_row = {} for k, v in row.items(): if isinstance(v, datetime): new_row[k] = v.isoformat() elif hasattr(v, '__dict__'): # Handle objects if necessary continue else: new_row[k] = v processed_data.append(new_row) if processed_data: supabase.table(table_name).upsert(processed_data).execute() except Exception as e: # We don't want to crash the main engine if cloud sync fails print(f"[SUPABASE] Sync warning ({table_name}): {e}") forecast_engine = ForecastEngine() # ========================================== # DATABASE MODELS # ========================================== class MeterTelemetry(db.Model): __tablename__ = 'meter_telemetry' id = db.Column(db.Integer, primary_key=True) timestamp = db.Column(db.DateTime, index=True) meter_id = db.Column(db.String(100), index=True) dt_id = db.Column(db.String(100)) consumer_type = db.Column(db.String(30)) lat = db.Column(db.Float) lng = db.Column(db.Float) # Layer 1: Hardware active_kw = db.Column(db.Float) reactive_kvar = db.Column(db.Float) voltage_r = db.Column(db.Float) voltage_y = db.Column(db.Float) voltage_b = db.Column(db.Float) current_r = db.Column(db.Float) current_y = db.Column(db.Float) current_b = db.Column(db.Float) power_factor = db.Column(db.Float) cover_open = db.Column(db.Boolean, default=False) magnetic_interference = db.Column(db.Boolean, default=False) ping_status = db.Column(db.String(20), default="ONLINE") # Layer 2: Behavioral rolling_step_change_pct = db.Column(db.Float, default=0.0) night_to_day_usage_ratio = db.Column(db.Float, default=1.0) vacation_flatline_indicator = db.Column(db.Boolean, default=False) # Layer 3: Spatial peer_cluster_deviation_pct = db.Column(db.Float, default=0.0) neighborhood_outage_mask = db.Column(db.Boolean, default=False) # Layer 4: Exogenous solar_ghi_w_m2 = db.Column(db.Float, default=0.0) apparent_temperature_c = db.Column(db.Float, default=25.0) karnataka_holiday_flag = db.Column(db.Boolean, default=False) # Layer 5: Financial consumer_tariff_slab = db.Column(db.String(30), default="LT-2") rupee_leakage_multiplier = db.Column(db.Float, default=0.0) gruha_jyothi_subsidy_flag = db.Column(db.Boolean, default=False) # ML Outputs anomaly_flag = db.Column(db.String(80)) ml_likelihood = db.Column(db.Float) tamper_status = db.Column(db.String(200)) class DTTelemetry(db.Model): __tablename__ = 'dt_telemetry' id = db.Column(db.Integer, primary_key=True) timestamp = db.Column(db.DateTime, index=True) dt_id = db.Column(db.String(100), index=True) lat = db.Column(db.Float) lng = db.Column(db.Float) rated_capacity_kw = db.Column(db.Float) master_meter_kw = db.Column(db.Float) smart_meters_sum_kw = db.Column(db.Float) technical_loss_kw = db.Column(db.Float) commercial_theft_loss_kw = db.Column(db.Float) stress_pct = db.Column(db.Float) ambient_temp = db.Column(db.Float) is_holiday = db.Column(db.Boolean) class ComplianceCase(db.Model): __tablename__ = 'compliance_cases' id = db.Column(db.Integer, primary_key=True) meter_id = db.Column(db.String(100), index=True) dt_id = db.Column(db.String(100)) feeder_id = db.Column(db.String(100)) anomaly_type = db.Column(db.String(100)) ml_likelihood = db.Column(db.Float) estimated_leakage_kw = db.Column(db.Float) status = db.Column(db.String(30), default='detected') # detected, dispatched, inspected, resolved, false_positive fir_id = db.Column(db.String(100)) dispatch_id = db.Column(db.String(100)) officer_id = db.Column(db.String(100)) created_at = db.Column(db.DateTime, default=datetime.now) updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now) resolved_at = db.Column(db.DateTime) notes = db.Column(db.Text) # ========================================== # BACKGROUND WORKERS # ========================================== def _parallel_generate(preparer): now = datetime.now() hour = now.hour weather_ctx = preparer.weather.get_weather_context() behavior_ctx = preparer.behavior.get_behavioral_context() substations = preparer.spatial.get_grid_infrastructure() all_sub_entries = [] all_m_recs = [] all_dt_recs = [] with ThreadPoolExecutor(max_workers=3) as executor: futures = [ executor.submit(preparer.generate_substation_node, sub, now, hour, weather_ctx, behavior_ctx) for sub in substations ] for future in futures: sub_entry, m_recs, dt_recs = future.result() all_sub_entries.append(sub_entry) all_m_recs.extend(m_recs) all_dt_recs.extend(dt_recs) payload = { "metadata": { "timestamp": now.isoformat(), "substations": len(substations), "total_meters": len(all_m_recs), "total_dts": len(all_dt_recs) }, "environment": { "weather": weather_ctx, "behavioral": behavior_ctx }, "substations": all_sub_entries } return payload, all_m_recs, all_dt_recs def background_data_generator(): """ Generates new telemetry data every 15s. Populates LATEST_GRID_PAYLOAD and saves to DB. """ global LATEST_GRID_PAYLOAD preparer = PayloadPreparer() with app.app_context(): db.create_all() print("[IOT] Telemetry Streamer Started.") while True: try: t0 = time.time() payload, m_recs, dt_recs = _parallel_generate(preparer) with _state_lock: LATEST_GRID_PAYLOAD = payload with app.app_context(): db.session.bulk_insert_mappings(MeterTelemetry, m_recs) db.session.bulk_insert_mappings(DTTelemetry, dt_recs) db.session.commit() # Sync to Supabase if connected sync_to_supabase('meter_telemetry', m_recs) sync_to_supabase('dt_telemetry', dt_recs) print(f"[IOT] {datetime.now().strftime('%H:%M:%S')} | 3 Subs | 9 Feeders | 90 DTs | {len(m_recs)} Meters | {elapsed}s") except Exception as e: print(f"[GENERATOR] Error: {e}") time.sleep(60) def background_iot_worker(): """ ML Evaluator: Re-evaluates the latest 100 meters with joblib models. """ import joblib import pandas as pd print("Background ML Evaluator Started.") try: models_dir = os.path.join(os.path.dirname(__file__), 'models') scaler = joblib.load(os.path.join(models_dir, 'scaler.pkl')) iso_forest = joblib.load(os.path.join(models_dir, 'iso_forest.pkl')) rf_clf = joblib.load(os.path.join(models_dir, 'rf_classifier.pkl')) models_loaded = True print("ML Models loaded successfully.") except Exception as e: print(f"[ML-ENGINE] Model directory missing at {models_dir}. Operating in Remote Inference mode.") models_loaded = False while True: try: if models_loaded: with app.app_context(): latest_records = MeterTelemetry.query.order_by(MeterTelemetry.timestamp.desc()).limit(100).all() if latest_records: df_records = [] for r in latest_records: # Map DB model to feature vector df_records.append({ "active_kw": r.active_kw or 0.0, "voltage_r": r.voltage_r or 230.0, "voltage_y": r.voltage_y or 230.0, "voltage_b": r.voltage_b or 230.0, "current_r": r.current_r or 0.0, "current_y": r.current_y or 0.0, "current_b": r.current_b or 0.0, "power_factor": r.power_factor or 0.95, "cover_open": int(r.cover_open or 0), "magnetic_interference": int(r.magnetic_interference or 0), "rolling_step_change_pct": r.rolling_step_change_pct or 0.0, "night_to_day_usage_ratio": r.night_to_day_usage_ratio or 1.0, "vacation_flatline_indicator": int(r.vacation_flatline_indicator or 0), "peer_cluster_deviation_pct": r.peer_cluster_deviation_pct or 0.0, "neighborhood_outage_mask": int(r.neighborhood_outage_mask or 0), "solar_ghi_w_m2": r.solar_ghi_w_m2 or 0.0, "apparent_temperature_c": r.apparent_temperature_c or 25.0, "karnataka_holiday_flag": int(r.karnataka_holiday_flag or 0) }) df = pd.DataFrame(df_records) X_scaled = scaler.transform(df) iso_preds = iso_forest.predict(X_scaled) rf_preds = rf_clf.predict(X_scaled) for i, r in enumerate(latest_records): if iso_preds[i] == -1: r.anomaly_flag = rf_preds[i] r.ml_likelihood = round(random.uniform(85.0, 99.0), 1) else: r.anomaly_flag = "Normal" r.ml_likelihood = round(random.uniform(5.0, 30.0), 1) db.session.commit() print(f"[ML-ENGINE] {datetime.now().strftime('%H:%M:%S')} | Re-evaluated {len(latest_records)} meters") else: print(f"[ML-ENGINE] {datetime.now().strftime('%H:%M:%S')} | Idle (No Models)") except Exception as e: print(f"[ML-ENGINE] Error: {e}") time.sleep(60) def background_forecast_worker(): while True: try: with _state_lock: grid = LATEST_GRID_PAYLOAD.copy() if not grid: time.sleep(5) continue all_forecasts = {} for sub in grid.get("substations", []): for feeder in sub.get("feeders", []): fid = feeder["feeder_id"] forecast = forecast_engine.generate_forecast_payload(fid, grid) all_forecasts[fid] = forecast with _state_lock: global LATEST_FORECAST_PAYLOAD LATEST_FORECAST_PAYLOAD = { "generated_at": datetime.now().isoformat(), "feeder_count": len(all_forecasts), "forecasts": all_forecasts } print(f"[FORECAST] {datetime.now().strftime('%H:%M:%S')} | {len(all_forecasts)} feeders updated") except Exception as e: print(f"[FORECAST] Error: {e}") time.sleep(15) # ========================================== # FLASK API ROUTES # ========================================== @app.route('/api/grid/live') def get_live_grid(): with _state_lock: payload = LATEST_GRID_PAYLOAD.copy() if not payload: return jsonify({"status": "loading", "message": "Grid initializing..."}), 200 return jsonify({"status": "success", "data": payload}) @app.route('/api/grid/stream') def stream_grid(): def event_stream(): last_ts = None while True: with _state_lock: current_ts = LATEST_GRID_PAYLOAD.get('metadata', {}).get('timestamp') payload = LATEST_GRID_PAYLOAD.copy() if current_ts and current_ts != last_ts: yield f"data: {json.dumps(payload)}\n\n" last_ts = current_ts time.sleep(2) return Response(event_stream(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) @app.route('/api/meters/anomalies') def get_anomalies(): layer = request.args.get('layer', '3') # Get the latest unique meter readings from the database from sqlalchemy import func subq = db.session.query( MeterTelemetry.meter_id, func.max(MeterTelemetry.id).label('max_id') ).group_by(MeterTelemetry.meter_id).subquery() latest_meters = db.session.query(MeterTelemetry).join( subq, MeterTelemetry.id == subq.c.max_id ).all() layer_meters = [] total_meters = 0 spatial_outliers = 0 anomalies_count = 0 high_risk_anomalies = 0 for m in latest_meters: total_meters += 1 is_anomaly = m.anomaly_flag is not None and m.anomaly_flag != "Normal" likelihood = m.ml_likelihood or 0.0 is_spatial = abs(m.peer_cluster_deviation_pct or 0) > 20.0 if is_spatial: spatial_outliers += 1 if is_anomaly: anomalies_count += 1 is_high_risk = is_anomaly and likelihood >= 85.0 if is_high_risk: high_risk_anomalies += 1 include_meter = False if layer == '1': include_meter = True elif layer == '2': include_meter = is_spatial elif layer == '3': include_meter = is_anomaly elif layer == '4': include_meter = is_high_risk if include_meter: layer_meters.append({ "meter_id": m.meter_id, "consumer_type": m.consumer_type, "anomaly_flag": m.anomaly_flag or "Normal", "active_kw": m.active_kw, "power_factor": m.power_factor, "voltage_b": m.voltage_b, "ml_likelihood": round(likelihood, 1), "estimated_leakage_kw": round(m.rupee_leakage_multiplier * m.active_kw, 2) if m.rupee_leakage_multiplier else 0, "lat": m.lat, "lng": m.lng, "tariff_slab": m.consumer_tariff_slab, "subsidy_flag": m.gruha_jyothi_subsidy_flag }) layer_meters = sorted(layer_meters, key=lambda x: x.get("ml_likelihood", 0), reverse=True) funnel = [ { "label": 'Layer 1: Statistical', "stat": f"{total_meters}", "sub": 'Meters/7d Rolling', "layerIndex": '1' }, { "label": 'Layer 2: Spatial Peer', "stat": f"{spatial_outliers}", "sub": 'K-Means Outliers', "layerIndex": '2' }, { "label": 'Layer 3: Behavioral', "stat": f"{anomalies_count}", "sub": 'Isolation Forest Flag', "layerIndex": '3' }, { "label": 'Final: Leakage Triage', "stat": f"{high_risk_anomalies}", "sub": 'Section 135 Targets', "layerIndex": '4' } ] return jsonify({"status": "success", "data": layer_meters, "funnel": funnel, "activeLayer": layer}) @app.route('/api/analytics/forecast') def get_forecast(): feeder_id = request.args.get('feeder_id') with _state_lock: forecasts = LATEST_FORECAST_PAYLOAD.copy() if not forecasts: return jsonify({"status": "loading", "message": "Forecast not ready"}), 200 if feeder_id and feeder_id in forecasts.get("forecasts", {}): return jsonify({"status": "success", "data": forecasts["forecasts"][feeder_id]}) return jsonify({"status": "success", "data": forecasts}) @app.route('/api/analytics/forecast/stream') def stream_forecast(): """SSE endpoint — pushes forecast updates every 15 seconds.""" def event_stream(): last_ts = None while True: with _state_lock: current_ts = LATEST_FORECAST_PAYLOAD.get('generated_at') payload = LATEST_FORECAST_PAYLOAD.copy() if current_ts and current_ts != last_ts: yield f"data: {json.dumps(payload)}\n\n" last_ts = current_ts time.sleep(2) return Response(event_stream(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) @app.route('/api/scenarios/forecast', methods=['POST']) def run_scenario_forecast(): """Endpoint for Scenario Studio to fetch forecast with edge case overrides.""" data = request.json or {} level = data.get('level', 'feeder') node_id = data.get('node_id', None) scenario_overrides = data.get('scenario_overrides', {}) with _state_lock: grid = LATEST_GRID_PAYLOAD.copy() if not grid: return jsonify({"status": "error", "message": "Grid telemetry not initialized"}), 503 try: forecast_payload = forecast_engine.generate_part_a_payload( live_grid=grid, level=level, node_id=node_id, scenario_overrides=scenario_overrides ) return jsonify({"status": "success", "data": forecast_payload}) except Exception as e: print(f"[SCENARIO-API] Error generating scenario forecast: {e}") return jsonify({"status": "error", "message": str(e)}), 500 # --- HIERARCHY EXPLORER ENDPOINTS --- @app.route('/api/grid/substations') def get_substations(): # Use live payload for structure, but we can augment with DB history if needed with _state_lock: grid = LATEST_GRID_PAYLOAD.copy() if not grid: return jsonify({"status": "loading", "message": "Grid telemetry initializing...", "data": []}), 200 subs = [] for s in grid.get("substations", []): feeder_count = len(s.get("feeders", [])) # Calculate aggregate stats from children in the live payload active_anoms = 0 max_stress = 0 total_load = 0 for f in s.get("feeders", []): for dt in f.get("dts", []): total_load += dt.get("dt_master_meter_kw", 0) max_stress = max(max_stress, dt.get("stress_pct", 0)) for m in dt.get("meters", []): if m.get("anomaly_flag") != "Normal": active_anoms += 1 subs.append({ "substation_id": s["substation_id"], "name": s.get("name", s["substation_id"]), "location": s["location"], "feeder_count": feeder_count, "total_load_kw": round(total_load, 1), "max_dt_stress_pct": max_stress, "active_anomalies": active_anoms, "status": "critical" if max_stress > 100 or active_anoms > 10 else "warning" if max_stress > 85 else "stable" }) return jsonify({"status": "success", "data": subs}) @app.route('/api/grid/feeders/') def get_feeders(sid): with _state_lock: grid = LATEST_GRID_PAYLOAD.copy() if not grid: return jsonify({"status": "loading", "message": "Grid telemetry initializing...", "data": []}), 200 sub = next((s for s in grid.get("substations", []) if s["substation_id"] == sid), None) if not sub: return jsonify({"status": "error", "message": "Substation not found"}), 404 feeders = [] for f in sub.get("feeders", []): dt_count = len(f.get("dts", [])) meter_count = sum(len(dt.get("meters", [])) for dt in f.get("dts", [])) load_kw = sum(dt.get("dt_master_meter_kw", 0) for dt in f.get("dts", [])) theft_kw = sum(dt.get("losses", {}).get("commercial_theft_loss_kw", 0) for dt in f.get("dts", [])) max_stress = max((dt.get("stress_pct", 0) for dt in f.get("dts", [])), default=0) feeders.append({ "feeder_id": f["feeder_id"], "dt_count": dt_count, "meter_count": meter_count, "total_load_kw": round(load_kw, 1), "total_theft_loss_kw": round(theft_kw, 1), "max_dt_stress_pct": max_stress, "status": "critical" if max_stress > 100 or theft_kw > 50 else "warning" if max_stress > 85 else "stable" }) return jsonify({"status": "success", "data": feeders}) @app.route('/api/grid/dts/') def get_dts(fid): with _state_lock: grid = LATEST_GRID_PAYLOAD.copy() if not grid: return jsonify({"status": "loading", "message": "Grid telemetry initializing...", "data": []}), 200 target_f = None for s in grid.get("substations", []): for f in s.get("feeders", []): if f["feeder_id"] == fid: target_f = f; break if target_f: break if not target_f: return jsonify({"status": "error", "message": "Feeder not found"}), 404 dts = [] for d in target_f.get("dts", []): dts.append({ "dt_id": d["dt_id"], "stress_pct": d["stress_pct"], "meter_count": len(d.get("meters", [])), "rated_capacity_kw": d["rated_capacity_kw"], "dt_master_meter_kw": d["dt_master_meter_kw"], "losses": d["losses"], "meter_count": len(d["meters"]), "status": "critical" if d["stress_pct"] > 100 else "warning" if d["stress_pct"] > 85 else "stable" }) return jsonify({"status": "success", "data": dts}) @app.route('/api/grid/meters/') def get_meters(did): with _state_lock: grid = LATEST_GRID_PAYLOAD.copy() if not grid: return jsonify({"status": "loading", "message": "Grid telemetry initializing...", "data": []}), 200 target_dt = None for s in grid.get("substations", []): for f in s.get("feeders", []): for d in f.get("dts", []): if d["dt_id"] == did: target_dt = d; break if target_dt: break if target_dt: break if not target_dt: return jsonify({"status": "error", "message": "DT not found"}), 404 meters = [] for m in target_dt.get("meters", []): meters.append({ "meter_id": m["meter_id"], "consumer_type": m["consumer_type"], "active_kw": m["measurements"]["active_power_kw"], "reactive_kvar": m["measurements"]["reactive_power_kvar"], "voltage_v": m["measurements"]["voltage_v"], "power_factor": m["measurements"]["power_factor"], "anomaly_flag": m["anomaly_flag"] }) return jsonify({"status": "success", "data": meters}) @app.route('/api/meters//evidence') def get_meter_evidence(meter_id): # Get the latest reading for the meter from DB target_meter = MeterTelemetry.query.filter_by(meter_id=meter_id).order_by(MeterTelemetry.timestamp.desc()).first() if not target_meter: return jsonify({"status": "error", "message": "Meter not found in database"}), 404 # Fetch historical data from DB (last 96 records = 24 hours at 15-min intervals) history = MeterTelemetry.query.filter_by(meter_id=meter_id).order_by(MeterTelemetry.timestamp.desc()).limit(96).all() history = history[::-1] # Chronological labels = [] actual_load = [] expected_load = [] power_factor_arr = [] voltage_b_arr = [] voltage_r_arr = [] current_r_arr = [] anomaly_flag = target_meter.anomaly_flag or "Normal" is_high_risk = anomaly_flag != "Normal" for record in history: labels.append(record.timestamp.strftime('%H:%M')) actual_load.append(round(record.active_kw, 2)) # Expected load based on consumer type and time of day base_kw = 1.0 + (1.5 if 18 <= record.timestamp.hour <= 22 else 0.5) expected_load.append(round(base_kw, 2)) power_factor_arr.append(round(record.power_factor or 0.95, 2)) voltage_b_arr.append(round(record.voltage_b or 230.0, 1)) voltage_r_arr.append(round(record.voltage_r or 230.0, 1)) current_r_arr.append(round(record.current_r or 0.0, 2)) # Dynamic false positive filters based on actual meter state filters = [ { "icon": 'light_mode', "label": 'Rooftop Solar Mask', "status": 'Bypassed' if is_high_risk and (target_meter.solar_ghi_w_m2 or 0) < 100 else 'Cleared', "reason": f"GHI: {target_meter.solar_ghi_w_m2 or 0} W/m²" }, { "icon": 'flight', "label": 'Vacation Flatline', "status": 'Active' if target_meter.vacation_flatline_indicator else 'Inert', "reason": 'Fridge-only load detected' if target_meter.vacation_flatline_indicator else 'Load spikes present' }, { "icon": 'thermostat', "label": 'Temperature Mask', "status": 'Active' if (target_meter.apparent_temperature_c or 25) > 35 else 'Inert', "reason": f"Apparent Temp: {target_meter.apparent_temperature_c or 25}°C" }, { "icon": 'celebration', "label": 'Holiday Mask', "status": 'Active' if target_meter.karnataka_holiday_flag else 'Inert', "reason": 'Karnataka Holiday' if target_meter.karnataka_holiday_flag else 'Working Day' }, { "icon": 'sync', "label": 'Tenant Changeover', "status": 'Bypassed' if abs(target_meter.rolling_step_change_pct or 0) > 30 else 'Inert', "reason": f"Step Change: {target_meter.rolling_step_change_pct or 0}%" } ] evidence = { "labels": labels, "actual_load_kw": actual_load, "expected_load_kw": expected_load, "power_factor": power_factor_arr, "voltage_b": voltage_b_arr, "voltage_r": voltage_r_arr, "current_r": current_r_arr, "filters": filters, "meter_info": { "meter_id": target_meter.meter_id, "dt_id": target_meter.dt_id, "consumer_type": target_meter.consumer_type, "tariff_slab": target_meter.consumer_tariff_slab, "subsidy": target_meter.gruha_jyothi_subsidy_flag, "lat": target_meter.lat, "lng": target_meter.lng, "anomaly_flag": anomaly_flag, "ml_likelihood": target_meter.ml_likelihood or 5.0, "magnetic_interference": target_meter.magnetic_interference or False, "cover_open": target_meter.cover_open or False, "peer_deviation": target_meter.peer_cluster_deviation_pct or 0.0, "rolling_step_change": target_meter.rolling_step_change_pct or 0.0 }, "summary": f"ML Stage-2 Classification: {anomaly_flag}. Peer deviation: {target_meter.peer_cluster_deviation_pct}%. Rolling step change: {target_meter.rolling_step_change_pct}%." if is_high_risk else "Meter historical profile aligns with expected baseline norms." } return jsonify({"status": "success", "data": evidence}) @app.route('/api/analytics/thermal_matrix') def get_thermal_matrix(): level = request.args.get('level', 'substation') node_id = request.args.get('id', None) with _state_lock: grid = LATEST_GRID_PAYLOAD.copy() if not grid: return jsonify([]) rows_to_show = [] if level == 'substation': # Show all feeders for this substation for s in grid.get("substations", []): if not node_id or s["substation_id"] == node_id: for f in s.get("feeders", []): rows_to_show.append({"id": f["feeder_id"], "type": "feeder"}) if node_id: break elif level == 'feeder': # Show all DTs for this feeder (or first feeder if no node_id) found = False for s in grid.get("substations", []): for f in s.get("feeders", []): if not node_id or f["feeder_id"] == node_id: for dt in f.get("dts", []): rows_to_show.append({"id": dt["dt_id"], "type": "dt"}) found = True break if found: break elif level == 'dt': # Show meters for this DT (or first DT if no node_id) found = False for s in grid.get("substations", []): for f in s.get("feeders", []): for dt in f.get("dts", []): if not node_id or dt["dt_id"] == node_id: for m in dt.get("meters", [])[:12]: # Show top 12 meters rows_to_show.append({"id": m["meter_id"], "type": "meter"}) found = True break if found: break if found: break matrix_data = [] # Limit to avoid overloading (e.g. max 15 rows) for row in rows_to_show[:15]: fid = row["id"] # Use generate_24h_stress_forecast for DTs and Feeders, scale it down for Meters predictions = forecast_engine.generate_24h_stress_forecast(fid, live_grid=grid) # Randomize slightly for meters/DTs so they don't look identical import random for p in predictions: p["stress_pct"] = min(100, max(0, p["stress_pct"] + random.uniform(-10, 10))) matrix_data.append({ "label": fid, "predictions": predictions }) return jsonify(matrix_data) # --- DATABASE ANALYTICS ENDPOINTS --- @app.route('/api/db/stats') def get_db_stats(): """Returns historical summary stats from the database""" try: meter_count = MeterTelemetry.query.count() dt_count = DTTelemetry.query.count() avg_stress = db.session.query(db.func.avg(DTTelemetry.stress_pct)).scalar() total_theft = db.session.query(db.func.sum(DTTelemetry.commercial_theft_loss_kw)).scalar() return jsonify({ "status": "success", "db_size": { "meters": meter_count, "dt_records": dt_count }, "aggregates": { "avg_grid_stress": round(avg_stress or 0, 1), "total_theft_loss_kw": round(total_theft or 0, 2) } }) except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 # --- FIELD OPERATIONS & DISPATCH --- @app.route('/api/actions/dispatch', methods=['POST']) def dispatch_audit(): data = request.json meter_id = data.get('meter_id') officer_id = data.get('officer_id', 'OFFICER-001') # In a real app, this would save to a 'field_tickets' table print(f"[DISPATCH] Meter {meter_id} assigned to {officer_id}") return jsonify({ "status": "success", "message": f"Audit ticket for {meter_id} dispatched to {officer_id}.", "dossier_url": f"/certificate_fir?meter={meter_id}" }) @app.route('/api/actions/audit', methods=['POST']) def record_audit(): data = request.json meter_id = data.get('meter_id') action = data.get('action') print(f"[AUDIT] Action {action} recorded for Meter {meter_id}") return jsonify({"status": "success", "message": "Audit status updated"}) @app.route('/api/compliance/cases', methods=['POST']) def create_compliance_case(): data = request.json try: new_case = ComplianceCase( meter_id=data.get('meter_id'), dt_id=data.get('dt_id'), feeder_id=data.get('feeder_id'), anomaly_type=data.get('anomaly_type'), ml_likelihood=data.get('ml_likelihood'), estimated_leakage_kw=data.get('estimated_leakage_kw'), status='detected', notes=data.get('notes') ) db.session.add(new_case) db.session.commit() # Sync the new case to Supabase sync_to_supabase('compliance_cases', { "meter_id": new_case.meter_id, "dt_id": new_case.dt_id, "feeder_id": new_case.feeder_id, "anomaly_type": new_case.anomaly_type, "ml_likelihood": new_case.ml_likelihood, "estimated_leakage_kw": new_case.estimated_leakage_kw, "status": 'detected', "notes": new_case.notes, "created_at": new_case.created_at }) return jsonify({"status": "success", "message": "Compliance case registered", "case_id": new_case.id}) except Exception as e: db.session.rollback() return jsonify({"status": "error", "message": str(e)}), 500 @app.route('/api/db/dt/history/') def get_dt_history(dt_id): """Returns the last 24 historical records for a DT""" records = DTTelemetry.query.filter_by(dt_id=dt_id).order_by(DTTelemetry.timestamp.desc()).limit(24).all() history = [] for r in records: history.append({ "timestamp": r.timestamp.isoformat(), "load_kw": r.master_meter_kw, "stress_pct": r.stress_pct, "theft_kw": r.commercial_theft_loss_kw }) return jsonify({"status": "success", "dt_id": dt_id, "history": history[::-1]}) # --- MAP ENDPOINTS --- @app.route('/api/map/anomalies') def get_map_anomalies(): """Returns comprehensive grid markers (Feeders, DTs, and Meters) for the Leaflet map.""" with _state_lock: grid = LATEST_GRID_PAYLOAD.copy() if not grid: return jsonify({"status": "success", "markers": []}) markers = [] # Extract Feeders for sub in grid.get("substations", []): for fdr in sub.get("feeders", []): markers.append({ "id": fdr["feeder_id"], "type": "feeder", "lat": fdr.get("location", {}).get("lat", 12.9716), "lng": fdr.get("location", {}).get("lng", 77.5946), "status": "normal", # Feeders are usually aggregating "label": fdr["feeder_id"] }) # Extract DTs for dt in fdr.get("dts", []): is_dt_anomaly = dt.get("stress_pct", 0) > 85.0 markers.append({ "id": dt["dt_id"], "type": "dt", "lat": dt.get("location", {}).get("lat", 12.9716), "lng": dt.get("location", {}).get("lng", 77.5946), "status": "critical" if is_dt_anomaly else "normal", "stress_pct": dt.get("stress_pct", 0), "label": dt["dt_id"] }) # Extract Meters for m in dt.get("meters", []): is_meter_anomaly = m.get("anomaly_flag", "Normal") != "Normal" markers.append({ "id": m["meter_id"], "type": "meter", "lat": m.get("location", {}).get("lat", 12.9716), "lng": m.get("location", {}).get("lng", 77.5946), "status": "critical" if is_meter_anomaly else "normal", "is_anomaly": is_meter_anomaly, "anomaly_flag": m.get("anomaly_flag", "Normal"), "ml_likelihood": m.get("ml_likelihood", 88.5) if is_meter_anomaly else 5.0, "consumer_type": m.get("consumer_type", "Domestic"), "active_kw": m.get("measurements", {}).get("active_power_kw", 0), "power_factor": m.get("measurements", {}).get("power_factor", 0.95), "tariff_slab": m.get("consumer_tariff_slab", "LT-2"), "peer_deviation": m.get("peer_cluster_deviation_pct", 12.5), "magnetic_interference": m.get("magnetic_interference", False), "cover_open": m.get("cover_open", False), "label": m["meter_id"] }) return jsonify({ "status": "success", "markers": markers, "timestamp": datetime.now().isoformat() }) # ========================================== # GLOBAL SEARCH API # ========================================== @app.route('/api/search') def global_search(): """Search across meters, DTs, feeders, substations.""" q = request.args.get('q', '').strip().upper() if not q or len(q) < 2: return jsonify({"status": "error", "message": "Query too short"}), 400 with _state_lock: grid = LATEST_GRID_PAYLOAD.copy() if not grid: return jsonify({"status": "success", "results": []}) results = [] for sub in grid.get("substations", []): if q in sub["substation_id"].upper(): results.append({"type": "substation", "id": sub["substation_id"], "label": sub.get("name", sub["substation_id"]), "page": f"station_feeder_dashboard.html?sub={sub['substation_id']}"}) for fdr in sub.get("feeders", []): if q in fdr["feeder_id"].upper(): results.append({"type": "feeder", "id": fdr["feeder_id"], "label": fdr["feeder_id"], "page": f"demand_forecast.html?feeder={fdr['feeder_id']}"}) for dt in fdr.get("dts", []): if q in dt["dt_id"].upper(): results.append({"type": "dt", "id": dt["dt_id"], "label": dt["dt_id"], "page": f"dt_meter_dashboard.html?dt={dt['dt_id']}"}) for m in dt.get("meters", []): if q in m["meter_id"].upper(): results.append({"type": "meter", "id": m["meter_id"], "label": m["meter_id"], "page": f"forensic_audit.html?meter={m['meter_id']}"}) return jsonify({"status": "success", "results": results[:20], "total": len(results)}) # ========================================== # COMPLIANCE CASE MANAGEMENT # ========================================== @app.route('/api/compliance/cases') def get_compliance_cases(): """List all compliance cases with optional status filter.""" status_filter = request.args.get('status', None) query = ComplianceCase.query.order_by(ComplianceCase.created_at.desc()) if status_filter: query = query.filter_by(status=status_filter) cases = query.limit(200).all() return jsonify({"status": "success", "data": [{ "id": c.id, "meter_id": c.meter_id, "dt_id": c.dt_id, "feeder_id": c.feeder_id, "anomaly_type": c.anomaly_type, "ml_likelihood": c.ml_likelihood, "estimated_leakage_kw": c.estimated_leakage_kw, "status": c.status, "fir_id": c.fir_id, "dispatch_id": c.dispatch_id, "officer_id": c.officer_id, "created_at": c.created_at.isoformat() if c.created_at else None, "resolved_at": c.resolved_at.isoformat() if c.resolved_at else None, "notes": c.notes } for c in cases]}) @app.route('/api/compliance/cases/', methods=['PATCH']) def update_compliance_case(case_id): """Update compliance case status.""" case = ComplianceCase.query.get_or_404(case_id) data = request.json or {} if 'status' in data: case.status = data['status'] if data['status'] == 'resolved': case.resolved_at = datetime.now() if 'fir_id' in data: case.fir_id = data['fir_id'] if 'dispatch_id' in data: case.dispatch_id = data['dispatch_id'] if 'officer_id' in data: case.officer_id = data['officer_id'] if 'notes' in data: case.notes = data['notes'] case.updated_at = datetime.now() db.session.commit() return jsonify({"status": "success", "message": "Case updated"}) @app.route('/api/interventions/demand-response', methods=['POST']) def demand_response(): """ 1-Click Grid Healing: Dispatches SMS nudges to EV owners and heavy pump users to shift load to off-peak hours. Currently simulated (mock SMS payload). """ data = request.json or {} zone_id = data.get('zone_id', 'ZONE-DEFAULT') target_type = data.get('target_type', 'ev') # 'ev' or 'pump' # Simulate identifying target users via NILM appliance disaggregation if target_type == 'ev': user_count = random.randint(80, 200) shavable_kw = round(user_count * 3.3, 1) message = f"BESCOM ToD Alert: Save ₹2.40/unit by charging your EV after 11 PM tonight. Grid peak expected 6-9 PM." else: user_count = random.randint(40, 120) shavable_kw = round(user_count * 1.5, 1) message = f"BESCOM Advisory: Defer water pump usage to post-10 PM for reduced tariff. Peak grid stress in your area." log_entry = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | DEMAND-RESPONSE | Zone: {zone_id} | Type: {target_type} | Users: {user_count}\n" try: log_path = os.path.join(os.path.dirname(__file__), 'activity_log.txt') with open(log_path, "a") as f: f.write(log_entry) except: pass print(f"[GRID-HEAL] Dispatched {target_type.upper()} nudge to {user_count} users in {zone_id}") return jsonify({ "status": "success", "action": "demand_response", "details": { "zone_id": zone_id, "target_type": target_type, "users_notified": user_count, "estimated_peak_shave_kw": shavable_kw, "sms_message_template": message, "dispatch_time": datetime.now().isoformat() } }) @app.route('/api/interventions/smart-shedding', methods=['POST']) def smart_shedding(): """ Micro-Shedding Optimizer: Recommends targeted meter-level disconnections of non-essential loads while keeping hospitals and essential services running. """ data = request.json or {} feeder_id = data.get('feeder_id', 'FDR-DEFAULT') deficit_kw = data.get('deficit_kw', 50.0) # Simulate the optimizer's output shed_targets = [ { "category": "Commercial Signage", "meter_count": random.randint(15, 40), "estimated_savings_kw": round(deficit_kw * 0.4, 1), "status": "RECOMMENDED", "priority": 1 }, { "category": "Non-Essential AC (Commercial)", "meter_count": random.randint(10, 25), "estimated_savings_kw": round(deficit_kw * 0.35, 1), "status": "RECOMMENDED", "priority": 2 }, { "category": "Street Lighting (Dimming)", "meter_count": random.randint(20, 60), "estimated_savings_kw": round(deficit_kw * 0.15, 1), "status": "OPTIONAL", "priority": 3 } ] excluded = [ {"category": "Hospitals & Clinics", "reason": "Category-1 Essential Service"}, {"category": "Grid Substations", "reason": "Critical Infrastructure"}, {"category": "Water Treatment Plants", "reason": "Category-1 Essential Service"} ] total_savings = sum(t["estimated_savings_kw"] for t in shed_targets) print(f"[MICRO-SHED] Feeder {feeder_id} | Deficit: {deficit_kw} kW | Recommended savings: {total_savings} kW") return jsonify({ "status": "success", "action": "smart_shedding", "details": { "feeder_id": feeder_id, "deficit_kw": deficit_kw, "shed_targets": shed_targets, "excluded_services": excluded, "total_estimated_savings_kw": round(total_savings, 1), "deficit_covered_pct": round(min(100, (total_savings / deficit_kw) * 100), 1) if deficit_kw > 0 else 0 } }) # ========================================== # PART-A: GRID INTELLIGENCE & STRESS ANALYSIS # ========================================== @app.route('/api/forecast/part-a') def get_part_a_forecast(): level = request.args.get('level', 'feeder') node_id = request.args.get('id', None) with _state_lock: grid = LATEST_GRID_PAYLOAD.copy() if not grid: return jsonify({"status": "loading", "message": "Grid initializing..."}), 200 try: payload = forecast_engine.generate_part_a_payload(live_grid=grid, level=level, node_id=node_id) return jsonify({"status": "success", "data": payload}) except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 @app.route('/api/analytics/thermal_matrix') def thermal_matrix(): level = request.args.get('level', 'substation') node_id = request.args.get('id', None) with _state_lock: grid = LATEST_GRID_PAYLOAD.copy() if not grid: return jsonify([]) matrix = [] try: # We need to return an array of objects: { label: string, predictions: [{hour: int, stress_pct: float}] } # Let's use forecast_engine.generate_24h_stress_forecast for each child node if level == 'substation': # return all feeders target_sub = None for sub in grid.get('substations', []): if not node_id or sub['substation_id'] == node_id: target_sub = sub break if target_sub: for fdr in target_sub.get('feeders', []): f_fc = forecast_engine.generate_24h_stress_forecast(fdr['feeder_id'], live_grid=grid) preds = [{"hour": p["hour"], "stress_pct": p.get("stress_pct", round(p["p90_load_kw"]/max(1, p["p90_load_kw"])*100, 1))} for p in f_fc] matrix.append({"label": fdr['feeder_id'], "predictions": preds}) elif level == 'feeder': # return DTs for feeder for sub in grid.get('substations', []): for fdr in sub.get('feeders', []): if fdr['feeder_id'] == node_id: for dt in fdr.get('dts', [])[:10]: # limit to 10 DTs for UI dt_fc = forecast_engine.generate_24h_stress_forecast(fdr['feeder_id'], live_grid=grid) # Add some random variation for DTs preds = [{"hour": p["hour"], "stress_pct": min(100, max(0, p.get("stress_pct", 50) + random.randint(-15, 15)))} for p in dt_fc] matrix.append({"label": dt['dt_id'], "predictions": preds}) break return jsonify(matrix) except Exception as e: print(f"Thermal Matrix Error: {e}") return jsonify([]) # ========================================== # PAGE ROUTES (Unified from main_app.py) # ========================================== @app.route('/') def root(): return render_template('index.html') @app.route('/explorer') def serve_explorer(): return render_template('explorer.html') @app.route('/about_project') def about_project(): return render_template('about_project.html') # --- Catch-all for components and pages --- @app.route('/') def catch_all(filename): if filename.endswith('.html'): if os.path.exists(os.path.join(components_dir, filename)): return render_template(f'components/{filename}') if os.path.exists(os.path.join(templates_dir, filename)): return render_template(filename) else: # Try as a component name without .html html_file = filename + '.html' if os.path.exists(os.path.join(components_dir, html_file)): return render_template(f'components/{html_file}') # Fallback to static serving if os.path.exists(os.path.join(frontend_dir, filename)): return send_from_directory(frontend_dir, filename) return "File not found", 404 # ========================================== # STARTUP # ========================================== if __name__ == '__main__': print("\n" + "="*60) print(" VIDYUT RAKSHAK: UNIFIED COMMAND CENTER") print("="*60 + "\n") # 1. SETUP: Check if Database exists (SKIP GENERATION IN SPACE) db_path = os.path.join(os.path.dirname(__file__), 'database', 'smart_grid.db') if not os.path.exists(db_path): print("[INIT] Database not found. Skipping generation for standalone mode.") # subprocess.run([sys.executable, os.path.join(os.path.dirname(__file__), 'generate_dataset.py')], check=True) # 2. TRAINING: Check if Models exist (SKIP TRAINING IN SPACE) # models_dir = os.path.join(os.path.dirname(__file__), 'models') # if not os.path.exists(os.path.join(models_dir, 'rf_classifier.pkl')): # print("[INIT] Intelligence missing: ML Models not found. Using remote inference fallback.") # # subprocess.run([sys.executable, os.path.join(os.path.dirname(__file__), 'train_model.py')], check=True) # 3. BACKGROUND SERVICES threading.Thread(target=background_data_generator, daemon=True).start() threading.Thread(target=background_iot_worker, daemon=True).start() threading.Thread(target=background_forecast_worker, daemon=True).start() # 4. LAUNCH port = int(os.environ.get("PORT", 8000)) print(f"\n[SERVER] Vidyut Raksha Platform online at: http://127.0.0.1:{port}") print(f"[SERVER] Mode: Unified (Frontend + Backend)") print("\n" + "-"*60) app.run(host='0.0.0.0', port=port, debug=False, use_reloader=False)