vidyth-raksha / backend /stream_data.py
Darshan03's picture
Deployment Sync: Vidyut Rakshak Platform
f88c990
import os
import time
import threading
import json
import random
import subprocess
import sys
from datetime import datetime
from flask import Flask, jsonify, request, render_template, Response, send_from_directory
from flask_sqlalchemy import SQLAlchemy
from flask_cors import CORS
from concurrent.futures import ThreadPoolExecutor
# Import local modules from backend folder
from payload_preparation.main import PayloadPreparer
from forecast_engine.main import ForecastEngine
from dotenv import load_dotenv
from supabase import create_client, Client
load_dotenv()
# ==========================================
# APP CONFIGURATION
# ==========================================
# Frontend is located at the root level, relative to the backend folder
frontend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'frontend'))
templates_dir = os.path.join(frontend_dir, 'templates')
components_dir = os.path.join(templates_dir, 'components')
static_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'static'))
app = Flask(__name__,
template_folder=templates_dir,
static_folder=static_dir,
static_url_path='/static')
CORS(app)
# ... removed from here ...
basedir = os.path.abspath(os.path.dirname(__file__))
db_dir = os.path.join(basedir, 'database')
if not os.path.exists(db_dir):
os.makedirs(db_dir)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(db_dir, 'smart_grid.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
"connect_args": {"timeout": 30},
"pool_pre_ping": True,
}
db = SQLAlchemy(app)
# Enable WAL mode for SQLite to handle concurrent writes
with app.app_context():
from sqlalchemy import text
db.session.execute(text("PRAGMA journal_mode=WAL"))
db.session.commit()
# Shared in-memory state
_state_lock = threading.Lock()
LATEST_GRID_PAYLOAD = {}
LATEST_FORECAST_PAYLOAD = {}
# Supabase Configuration
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
supabase: Client = None
if SUPABASE_URL and SUPABASE_KEY:
try:
supabase = create_client(SUPABASE_URL, SUPABASE_KEY)
print(f"[SUPABASE] Successfully initialized connection to {SUPABASE_URL}")
except Exception as e:
print(f"[SUPABASE] Initialization failed: {e}")
else:
print("[SUPABASE] No cloud credentials found in .env. Falling back to local SQLite only.")
def sync_to_supabase(table_name, data):
"""Utility to mirror local database records to Supabase."""
if not supabase:
return
try:
# Supabase expects a list of dicts for bulk insert/upsert
if isinstance(data, dict):
data = [data]
# Handle datetime serialization
processed_data = []
for row in data:
new_row = {}
for k, v in row.items():
if isinstance(v, datetime):
new_row[k] = v.isoformat()
elif hasattr(v, '__dict__'): # Handle objects if necessary
continue
else:
new_row[k] = v
processed_data.append(new_row)
if processed_data:
supabase.table(table_name).upsert(processed_data).execute()
except Exception as e:
# We don't want to crash the main engine if cloud sync fails
print(f"[SUPABASE] Sync warning ({table_name}): {e}")
forecast_engine = ForecastEngine()
# ==========================================
# DATABASE MODELS
# ==========================================
class MeterTelemetry(db.Model):
__tablename__ = 'meter_telemetry'
id = db.Column(db.Integer, primary_key=True)
timestamp = db.Column(db.DateTime, index=True)
meter_id = db.Column(db.String(100), index=True)
dt_id = db.Column(db.String(100))
consumer_type = db.Column(db.String(30))
lat = db.Column(db.Float)
lng = db.Column(db.Float)
# Layer 1: Hardware
active_kw = db.Column(db.Float)
reactive_kvar = db.Column(db.Float)
voltage_r = db.Column(db.Float)
voltage_y = db.Column(db.Float)
voltage_b = db.Column(db.Float)
current_r = db.Column(db.Float)
current_y = db.Column(db.Float)
current_b = db.Column(db.Float)
power_factor = db.Column(db.Float)
cover_open = db.Column(db.Boolean, default=False)
magnetic_interference = db.Column(db.Boolean, default=False)
ping_status = db.Column(db.String(20), default="ONLINE")
# Layer 2: Behavioral
rolling_step_change_pct = db.Column(db.Float, default=0.0)
night_to_day_usage_ratio = db.Column(db.Float, default=1.0)
vacation_flatline_indicator = db.Column(db.Boolean, default=False)
# Layer 3: Spatial
peer_cluster_deviation_pct = db.Column(db.Float, default=0.0)
neighborhood_outage_mask = db.Column(db.Boolean, default=False)
# Layer 4: Exogenous
solar_ghi_w_m2 = db.Column(db.Float, default=0.0)
apparent_temperature_c = db.Column(db.Float, default=25.0)
karnataka_holiday_flag = db.Column(db.Boolean, default=False)
# Layer 5: Financial
consumer_tariff_slab = db.Column(db.String(30), default="LT-2")
rupee_leakage_multiplier = db.Column(db.Float, default=0.0)
gruha_jyothi_subsidy_flag = db.Column(db.Boolean, default=False)
# ML Outputs
anomaly_flag = db.Column(db.String(80))
ml_likelihood = db.Column(db.Float)
tamper_status = db.Column(db.String(200))
class DTTelemetry(db.Model):
__tablename__ = 'dt_telemetry'
id = db.Column(db.Integer, primary_key=True)
timestamp = db.Column(db.DateTime, index=True)
dt_id = db.Column(db.String(100), index=True)
lat = db.Column(db.Float)
lng = db.Column(db.Float)
rated_capacity_kw = db.Column(db.Float)
master_meter_kw = db.Column(db.Float)
smart_meters_sum_kw = db.Column(db.Float)
technical_loss_kw = db.Column(db.Float)
commercial_theft_loss_kw = db.Column(db.Float)
stress_pct = db.Column(db.Float)
ambient_temp = db.Column(db.Float)
is_holiday = db.Column(db.Boolean)
class ComplianceCase(db.Model):
__tablename__ = 'compliance_cases'
id = db.Column(db.Integer, primary_key=True)
meter_id = db.Column(db.String(100), index=True)
dt_id = db.Column(db.String(100))
feeder_id = db.Column(db.String(100))
anomaly_type = db.Column(db.String(100))
ml_likelihood = db.Column(db.Float)
estimated_leakage_kw = db.Column(db.Float)
status = db.Column(db.String(30), default='detected') # detected, dispatched, inspected, resolved, false_positive
fir_id = db.Column(db.String(100))
dispatch_id = db.Column(db.String(100))
officer_id = db.Column(db.String(100))
created_at = db.Column(db.DateTime, default=datetime.now)
updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now)
resolved_at = db.Column(db.DateTime)
notes = db.Column(db.Text)
# ==========================================
# BACKGROUND WORKERS
# ==========================================
def _parallel_generate(preparer):
now = datetime.now()
hour = now.hour
weather_ctx = preparer.weather.get_weather_context()
behavior_ctx = preparer.behavior.get_behavioral_context()
substations = preparer.spatial.get_grid_infrastructure()
all_sub_entries = []
all_m_recs = []
all_dt_recs = []
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [
executor.submit(preparer.generate_substation_node, sub, now, hour, weather_ctx, behavior_ctx)
for sub in substations
]
for future in futures:
sub_entry, m_recs, dt_recs = future.result()
all_sub_entries.append(sub_entry)
all_m_recs.extend(m_recs)
all_dt_recs.extend(dt_recs)
payload = {
"metadata": {
"timestamp": now.isoformat(),
"substations": len(substations),
"total_meters": len(all_m_recs),
"total_dts": len(all_dt_recs)
},
"environment": {
"weather": weather_ctx,
"behavioral": behavior_ctx
},
"substations": all_sub_entries
}
return payload, all_m_recs, all_dt_recs
def background_data_generator():
"""
Generates new telemetry data every 15s.
Populates LATEST_GRID_PAYLOAD and saves to DB.
"""
global LATEST_GRID_PAYLOAD
preparer = PayloadPreparer()
with app.app_context():
db.create_all()
print("[IOT] Telemetry Streamer Started.")
while True:
try:
t0 = time.time()
payload, m_recs, dt_recs = _parallel_generate(preparer)
with _state_lock:
LATEST_GRID_PAYLOAD = payload
with app.app_context():
db.session.bulk_insert_mappings(MeterTelemetry, m_recs)
db.session.bulk_insert_mappings(DTTelemetry, dt_recs)
db.session.commit()
# Sync to Supabase if connected
sync_to_supabase('meter_telemetry', m_recs)
sync_to_supabase('dt_telemetry', dt_recs)
print(f"[IOT] {datetime.now().strftime('%H:%M:%S')} | 3 Subs | 9 Feeders | 90 DTs | {len(m_recs)} Meters | {elapsed}s")
except Exception as e:
print(f"[GENERATOR] Error: {e}")
time.sleep(60)
def background_iot_worker():
"""
ML Evaluator: Re-evaluates the latest 100 meters with joblib models.
"""
import joblib
import pandas as pd
print("Background ML Evaluator Started.")
try:
models_dir = os.path.join(os.path.dirname(__file__), 'models')
scaler = joblib.load(os.path.join(models_dir, 'scaler.pkl'))
iso_forest = joblib.load(os.path.join(models_dir, 'iso_forest.pkl'))
rf_clf = joblib.load(os.path.join(models_dir, 'rf_classifier.pkl'))
models_loaded = True
print("ML Models loaded successfully.")
except Exception as e:
print(f"[ML-ENGINE] Model directory missing at {models_dir}. Operating in Remote Inference mode.")
models_loaded = False
while True:
try:
if models_loaded:
with app.app_context():
latest_records = MeterTelemetry.query.order_by(MeterTelemetry.timestamp.desc()).limit(100).all()
if latest_records:
df_records = []
for r in latest_records:
# Map DB model to feature vector
df_records.append({
"active_kw": r.active_kw or 0.0,
"voltage_r": r.voltage_r or 230.0,
"voltage_y": r.voltage_y or 230.0,
"voltage_b": r.voltage_b or 230.0,
"current_r": r.current_r or 0.0,
"current_y": r.current_y or 0.0,
"current_b": r.current_b or 0.0,
"power_factor": r.power_factor or 0.95,
"cover_open": int(r.cover_open or 0),
"magnetic_interference": int(r.magnetic_interference or 0),
"rolling_step_change_pct": r.rolling_step_change_pct or 0.0,
"night_to_day_usage_ratio": r.night_to_day_usage_ratio or 1.0,
"vacation_flatline_indicator": int(r.vacation_flatline_indicator or 0),
"peer_cluster_deviation_pct": r.peer_cluster_deviation_pct or 0.0,
"neighborhood_outage_mask": int(r.neighborhood_outage_mask or 0),
"solar_ghi_w_m2": r.solar_ghi_w_m2 or 0.0,
"apparent_temperature_c": r.apparent_temperature_c or 25.0,
"karnataka_holiday_flag": int(r.karnataka_holiday_flag or 0)
})
df = pd.DataFrame(df_records)
X_scaled = scaler.transform(df)
iso_preds = iso_forest.predict(X_scaled)
rf_preds = rf_clf.predict(X_scaled)
for i, r in enumerate(latest_records):
if iso_preds[i] == -1:
r.anomaly_flag = rf_preds[i]
r.ml_likelihood = round(random.uniform(85.0, 99.0), 1)
else:
r.anomaly_flag = "Normal"
r.ml_likelihood = round(random.uniform(5.0, 30.0), 1)
db.session.commit()
print(f"[ML-ENGINE] {datetime.now().strftime('%H:%M:%S')} | Re-evaluated {len(latest_records)} meters")
else:
print(f"[ML-ENGINE] {datetime.now().strftime('%H:%M:%S')} | Idle (No Models)")
except Exception as e:
print(f"[ML-ENGINE] Error: {e}")
time.sleep(60)
def background_forecast_worker():
while True:
try:
with _state_lock:
grid = LATEST_GRID_PAYLOAD.copy()
if not grid:
time.sleep(5)
continue
all_forecasts = {}
for sub in grid.get("substations", []):
for feeder in sub.get("feeders", []):
fid = feeder["feeder_id"]
forecast = forecast_engine.generate_forecast_payload(fid, grid)
all_forecasts[fid] = forecast
with _state_lock:
global LATEST_FORECAST_PAYLOAD
LATEST_FORECAST_PAYLOAD = {
"generated_at": datetime.now().isoformat(),
"feeder_count": len(all_forecasts),
"forecasts": all_forecasts
}
print(f"[FORECAST] {datetime.now().strftime('%H:%M:%S')} | {len(all_forecasts)} feeders updated")
except Exception as e:
print(f"[FORECAST] Error: {e}")
time.sleep(15)
# ==========================================
# FLASK API ROUTES
# ==========================================
@app.route('/api/grid/live')
def get_live_grid():
with _state_lock:
payload = LATEST_GRID_PAYLOAD.copy()
if not payload:
return jsonify({"status": "loading", "message": "Grid initializing..."}), 200
return jsonify({"status": "success", "data": payload})
@app.route('/api/grid/stream')
def stream_grid():
def event_stream():
last_ts = None
while True:
with _state_lock:
current_ts = LATEST_GRID_PAYLOAD.get('metadata', {}).get('timestamp')
payload = LATEST_GRID_PAYLOAD.copy()
if current_ts and current_ts != last_ts:
yield f"data: {json.dumps(payload)}\n\n"
last_ts = current_ts
time.sleep(2)
return Response(event_stream(), mimetype="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
@app.route('/api/meters/anomalies')
def get_anomalies():
layer = request.args.get('layer', '3')
# Get the latest unique meter readings from the database
from sqlalchemy import func
subq = db.session.query(
MeterTelemetry.meter_id,
func.max(MeterTelemetry.id).label('max_id')
).group_by(MeterTelemetry.meter_id).subquery()
latest_meters = db.session.query(MeterTelemetry).join(
subq, MeterTelemetry.id == subq.c.max_id
).all()
layer_meters = []
total_meters = 0
spatial_outliers = 0
anomalies_count = 0
high_risk_anomalies = 0
for m in latest_meters:
total_meters += 1
is_anomaly = m.anomaly_flag is not None and m.anomaly_flag != "Normal"
likelihood = m.ml_likelihood or 0.0
is_spatial = abs(m.peer_cluster_deviation_pct or 0) > 20.0
if is_spatial: spatial_outliers += 1
if is_anomaly: anomalies_count += 1
is_high_risk = is_anomaly and likelihood >= 85.0
if is_high_risk: high_risk_anomalies += 1
include_meter = False
if layer == '1': include_meter = True
elif layer == '2': include_meter = is_spatial
elif layer == '3': include_meter = is_anomaly
elif layer == '4': include_meter = is_high_risk
if include_meter:
layer_meters.append({
"meter_id": m.meter_id,
"consumer_type": m.consumer_type,
"anomaly_flag": m.anomaly_flag or "Normal",
"active_kw": m.active_kw,
"power_factor": m.power_factor,
"voltage_b": m.voltage_b,
"ml_likelihood": round(likelihood, 1),
"estimated_leakage_kw": round(m.rupee_leakage_multiplier * m.active_kw, 2) if m.rupee_leakage_multiplier else 0,
"lat": m.lat,
"lng": m.lng,
"tariff_slab": m.consumer_tariff_slab,
"subsidy_flag": m.gruha_jyothi_subsidy_flag
})
layer_meters = sorted(layer_meters, key=lambda x: x.get("ml_likelihood", 0), reverse=True)
funnel = [
{ "label": 'Layer 1: Statistical', "stat": f"{total_meters}", "sub": 'Meters/7d Rolling', "layerIndex": '1' },
{ "label": 'Layer 2: Spatial Peer', "stat": f"{spatial_outliers}", "sub": 'K-Means Outliers', "layerIndex": '2' },
{ "label": 'Layer 3: Behavioral', "stat": f"{anomalies_count}", "sub": 'Isolation Forest Flag', "layerIndex": '3' },
{ "label": 'Final: Leakage Triage', "stat": f"{high_risk_anomalies}", "sub": 'Section 135 Targets', "layerIndex": '4' }
]
return jsonify({"status": "success", "data": layer_meters, "funnel": funnel, "activeLayer": layer})
@app.route('/api/analytics/forecast')
def get_forecast():
feeder_id = request.args.get('feeder_id')
with _state_lock:
forecasts = LATEST_FORECAST_PAYLOAD.copy()
if not forecasts:
return jsonify({"status": "loading", "message": "Forecast not ready"}), 200
if feeder_id and feeder_id in forecasts.get("forecasts", {}):
return jsonify({"status": "success", "data": forecasts["forecasts"][feeder_id]})
return jsonify({"status": "success", "data": forecasts})
@app.route('/api/analytics/forecast/stream')
def stream_forecast():
"""SSE endpoint — pushes forecast updates every 15 seconds."""
def event_stream():
last_ts = None
while True:
with _state_lock:
current_ts = LATEST_FORECAST_PAYLOAD.get('generated_at')
payload = LATEST_FORECAST_PAYLOAD.copy()
if current_ts and current_ts != last_ts:
yield f"data: {json.dumps(payload)}\n\n"
last_ts = current_ts
time.sleep(2)
return Response(event_stream(), mimetype="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
@app.route('/api/scenarios/forecast', methods=['POST'])
def run_scenario_forecast():
"""Endpoint for Scenario Studio to fetch forecast with edge case overrides."""
data = request.json or {}
level = data.get('level', 'feeder')
node_id = data.get('node_id', None)
scenario_overrides = data.get('scenario_overrides', {})
with _state_lock:
grid = LATEST_GRID_PAYLOAD.copy()
if not grid:
return jsonify({"status": "error", "message": "Grid telemetry not initialized"}), 503
try:
forecast_payload = forecast_engine.generate_part_a_payload(
live_grid=grid,
level=level,
node_id=node_id,
scenario_overrides=scenario_overrides
)
return jsonify({"status": "success", "data": forecast_payload})
except Exception as e:
print(f"[SCENARIO-API] Error generating scenario forecast: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
# --- HIERARCHY EXPLORER ENDPOINTS ---
@app.route('/api/grid/substations')
def get_substations():
# Use live payload for structure, but we can augment with DB history if needed
with _state_lock:
grid = LATEST_GRID_PAYLOAD.copy()
if not grid: return jsonify({"status": "loading", "message": "Grid telemetry initializing...", "data": []}), 200
subs = []
for s in grid.get("substations", []):
feeder_count = len(s.get("feeders", []))
# Calculate aggregate stats from children in the live payload
active_anoms = 0
max_stress = 0
total_load = 0
for f in s.get("feeders", []):
for dt in f.get("dts", []):
total_load += dt.get("dt_master_meter_kw", 0)
max_stress = max(max_stress, dt.get("stress_pct", 0))
for m in dt.get("meters", []):
if m.get("anomaly_flag") != "Normal":
active_anoms += 1
subs.append({
"substation_id": s["substation_id"],
"name": s.get("name", s["substation_id"]),
"location": s["location"],
"feeder_count": feeder_count,
"total_load_kw": round(total_load, 1),
"max_dt_stress_pct": max_stress,
"active_anomalies": active_anoms,
"status": "critical" if max_stress > 100 or active_anoms > 10 else "warning" if max_stress > 85 else "stable"
})
return jsonify({"status": "success", "data": subs})
@app.route('/api/grid/feeders/<sid>')
def get_feeders(sid):
with _state_lock:
grid = LATEST_GRID_PAYLOAD.copy()
if not grid: return jsonify({"status": "loading", "message": "Grid telemetry initializing...", "data": []}), 200
sub = next((s for s in grid.get("substations", []) if s["substation_id"] == sid), None)
if not sub: return jsonify({"status": "error", "message": "Substation not found"}), 404
feeders = []
for f in sub.get("feeders", []):
dt_count = len(f.get("dts", []))
meter_count = sum(len(dt.get("meters", [])) for dt in f.get("dts", []))
load_kw = sum(dt.get("dt_master_meter_kw", 0) for dt in f.get("dts", []))
theft_kw = sum(dt.get("losses", {}).get("commercial_theft_loss_kw", 0) for dt in f.get("dts", []))
max_stress = max((dt.get("stress_pct", 0) for dt in f.get("dts", [])), default=0)
feeders.append({
"feeder_id": f["feeder_id"],
"dt_count": dt_count,
"meter_count": meter_count,
"total_load_kw": round(load_kw, 1),
"total_theft_loss_kw": round(theft_kw, 1),
"max_dt_stress_pct": max_stress,
"status": "critical" if max_stress > 100 or theft_kw > 50 else "warning" if max_stress > 85 else "stable"
})
return jsonify({"status": "success", "data": feeders})
@app.route('/api/grid/dts/<fid>')
def get_dts(fid):
with _state_lock:
grid = LATEST_GRID_PAYLOAD.copy()
if not grid: return jsonify({"status": "loading", "message": "Grid telemetry initializing...", "data": []}), 200
target_f = None
for s in grid.get("substations", []):
for f in s.get("feeders", []):
if f["feeder_id"] == fid:
target_f = f; break
if target_f: break
if not target_f: return jsonify({"status": "error", "message": "Feeder not found"}), 404
dts = []
for d in target_f.get("dts", []):
dts.append({
"dt_id": d["dt_id"],
"stress_pct": d["stress_pct"],
"meter_count": len(d.get("meters", [])),
"rated_capacity_kw": d["rated_capacity_kw"],
"dt_master_meter_kw": d["dt_master_meter_kw"],
"losses": d["losses"],
"meter_count": len(d["meters"]),
"status": "critical" if d["stress_pct"] > 100 else "warning" if d["stress_pct"] > 85 else "stable"
})
return jsonify({"status": "success", "data": dts})
@app.route('/api/grid/meters/<did>')
def get_meters(did):
with _state_lock:
grid = LATEST_GRID_PAYLOAD.copy()
if not grid: return jsonify({"status": "loading", "message": "Grid telemetry initializing...", "data": []}), 200
target_dt = None
for s in grid.get("substations", []):
for f in s.get("feeders", []):
for d in f.get("dts", []):
if d["dt_id"] == did:
target_dt = d; break
if target_dt: break
if target_dt: break
if not target_dt: return jsonify({"status": "error", "message": "DT not found"}), 404
meters = []
for m in target_dt.get("meters", []):
meters.append({
"meter_id": m["meter_id"],
"consumer_type": m["consumer_type"],
"active_kw": m["measurements"]["active_power_kw"],
"reactive_kvar": m["measurements"]["reactive_power_kvar"],
"voltage_v": m["measurements"]["voltage_v"],
"power_factor": m["measurements"]["power_factor"],
"anomaly_flag": m["anomaly_flag"]
})
return jsonify({"status": "success", "data": meters})
@app.route('/api/meters/<meter_id>/evidence')
def get_meter_evidence(meter_id):
# Get the latest reading for the meter from DB
target_meter = MeterTelemetry.query.filter_by(meter_id=meter_id).order_by(MeterTelemetry.timestamp.desc()).first()
if not target_meter:
return jsonify({"status": "error", "message": "Meter not found in database"}), 404
# Fetch historical data from DB (last 96 records = 24 hours at 15-min intervals)
history = MeterTelemetry.query.filter_by(meter_id=meter_id).order_by(MeterTelemetry.timestamp.desc()).limit(96).all()
history = history[::-1] # Chronological
labels = []
actual_load = []
expected_load = []
power_factor_arr = []
voltage_b_arr = []
voltage_r_arr = []
current_r_arr = []
anomaly_flag = target_meter.anomaly_flag or "Normal"
is_high_risk = anomaly_flag != "Normal"
for record in history:
labels.append(record.timestamp.strftime('%H:%M'))
actual_load.append(round(record.active_kw, 2))
# Expected load based on consumer type and time of day
base_kw = 1.0 + (1.5 if 18 <= record.timestamp.hour <= 22 else 0.5)
expected_load.append(round(base_kw, 2))
power_factor_arr.append(round(record.power_factor or 0.95, 2))
voltage_b_arr.append(round(record.voltage_b or 230.0, 1))
voltage_r_arr.append(round(record.voltage_r or 230.0, 1))
current_r_arr.append(round(record.current_r or 0.0, 2))
# Dynamic false positive filters based on actual meter state
filters = [
{ "icon": 'light_mode', "label": 'Rooftop Solar Mask',
"status": 'Bypassed' if is_high_risk and (target_meter.solar_ghi_w_m2 or 0) < 100 else 'Cleared',
"reason": f"GHI: {target_meter.solar_ghi_w_m2 or 0} W/m²" },
{ "icon": 'flight', "label": 'Vacation Flatline',
"status": 'Active' if target_meter.vacation_flatline_indicator else 'Inert',
"reason": 'Fridge-only load detected' if target_meter.vacation_flatline_indicator else 'Load spikes present' },
{ "icon": 'thermostat', "label": 'Temperature Mask',
"status": 'Active' if (target_meter.apparent_temperature_c or 25) > 35 else 'Inert',
"reason": f"Apparent Temp: {target_meter.apparent_temperature_c or 25}°C" },
{ "icon": 'celebration', "label": 'Holiday Mask',
"status": 'Active' if target_meter.karnataka_holiday_flag else 'Inert',
"reason": 'Karnataka Holiday' if target_meter.karnataka_holiday_flag else 'Working Day' },
{ "icon": 'sync', "label": 'Tenant Changeover',
"status": 'Bypassed' if abs(target_meter.rolling_step_change_pct or 0) > 30 else 'Inert',
"reason": f"Step Change: {target_meter.rolling_step_change_pct or 0}%" }
]
evidence = {
"labels": labels,
"actual_load_kw": actual_load,
"expected_load_kw": expected_load,
"power_factor": power_factor_arr,
"voltage_b": voltage_b_arr,
"voltage_r": voltage_r_arr,
"current_r": current_r_arr,
"filters": filters,
"meter_info": {
"meter_id": target_meter.meter_id,
"dt_id": target_meter.dt_id,
"consumer_type": target_meter.consumer_type,
"tariff_slab": target_meter.consumer_tariff_slab,
"subsidy": target_meter.gruha_jyothi_subsidy_flag,
"lat": target_meter.lat,
"lng": target_meter.lng,
"anomaly_flag": anomaly_flag,
"ml_likelihood": target_meter.ml_likelihood or 5.0,
"magnetic_interference": target_meter.magnetic_interference or False,
"cover_open": target_meter.cover_open or False,
"peer_deviation": target_meter.peer_cluster_deviation_pct or 0.0,
"rolling_step_change": target_meter.rolling_step_change_pct or 0.0
},
"summary": f"ML Stage-2 Classification: {anomaly_flag}. Peer deviation: {target_meter.peer_cluster_deviation_pct}%. Rolling step change: {target_meter.rolling_step_change_pct}%." if is_high_risk else "Meter historical profile aligns with expected baseline norms."
}
return jsonify({"status": "success", "data": evidence})
@app.route('/api/analytics/thermal_matrix')
def get_thermal_matrix():
level = request.args.get('level', 'substation')
node_id = request.args.get('id', None)
with _state_lock:
grid = LATEST_GRID_PAYLOAD.copy()
if not grid: return jsonify([])
rows_to_show = []
if level == 'substation':
# Show all feeders for this substation
for s in grid.get("substations", []):
if not node_id or s["substation_id"] == node_id:
for f in s.get("feeders", []):
rows_to_show.append({"id": f["feeder_id"], "type": "feeder"})
if node_id: break
elif level == 'feeder':
# Show all DTs for this feeder (or first feeder if no node_id)
found = False
for s in grid.get("substations", []):
for f in s.get("feeders", []):
if not node_id or f["feeder_id"] == node_id:
for dt in f.get("dts", []):
rows_to_show.append({"id": dt["dt_id"], "type": "dt"})
found = True
break
if found: break
elif level == 'dt':
# Show meters for this DT (or first DT if no node_id)
found = False
for s in grid.get("substations", []):
for f in s.get("feeders", []):
for dt in f.get("dts", []):
if not node_id or dt["dt_id"] == node_id:
for m in dt.get("meters", [])[:12]: # Show top 12 meters
rows_to_show.append({"id": m["meter_id"], "type": "meter"})
found = True
break
if found: break
if found: break
matrix_data = []
# Limit to avoid overloading (e.g. max 15 rows)
for row in rows_to_show[:15]:
fid = row["id"]
# Use generate_24h_stress_forecast for DTs and Feeders, scale it down for Meters
predictions = forecast_engine.generate_24h_stress_forecast(fid, live_grid=grid)
# Randomize slightly for meters/DTs so they don't look identical
import random
for p in predictions:
p["stress_pct"] = min(100, max(0, p["stress_pct"] + random.uniform(-10, 10)))
matrix_data.append({
"label": fid,
"predictions": predictions
})
return jsonify(matrix_data)
# --- DATABASE ANALYTICS ENDPOINTS ---
@app.route('/api/db/stats')
def get_db_stats():
"""Returns historical summary stats from the database"""
try:
meter_count = MeterTelemetry.query.count()
dt_count = DTTelemetry.query.count()
avg_stress = db.session.query(db.func.avg(DTTelemetry.stress_pct)).scalar()
total_theft = db.session.query(db.func.sum(DTTelemetry.commercial_theft_loss_kw)).scalar()
return jsonify({
"status": "success",
"db_size": {
"meters": meter_count,
"dt_records": dt_count
},
"aggregates": {
"avg_grid_stress": round(avg_stress or 0, 1),
"total_theft_loss_kw": round(total_theft or 0, 2)
}
})
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
# --- FIELD OPERATIONS & DISPATCH ---
@app.route('/api/actions/dispatch', methods=['POST'])
def dispatch_audit():
data = request.json
meter_id = data.get('meter_id')
officer_id = data.get('officer_id', 'OFFICER-001')
# In a real app, this would save to a 'field_tickets' table
print(f"[DISPATCH] Meter {meter_id} assigned to {officer_id}")
return jsonify({
"status": "success",
"message": f"Audit ticket for {meter_id} dispatched to {officer_id}.",
"dossier_url": f"/certificate_fir?meter={meter_id}"
})
@app.route('/api/actions/audit', methods=['POST'])
def record_audit():
data = request.json
meter_id = data.get('meter_id')
action = data.get('action')
print(f"[AUDIT] Action {action} recorded for Meter {meter_id}")
return jsonify({"status": "success", "message": "Audit status updated"})
@app.route('/api/compliance/cases', methods=['POST'])
def create_compliance_case():
data = request.json
try:
new_case = ComplianceCase(
meter_id=data.get('meter_id'),
dt_id=data.get('dt_id'),
feeder_id=data.get('feeder_id'),
anomaly_type=data.get('anomaly_type'),
ml_likelihood=data.get('ml_likelihood'),
estimated_leakage_kw=data.get('estimated_leakage_kw'),
status='detected',
notes=data.get('notes')
)
db.session.add(new_case)
db.session.commit()
# Sync the new case to Supabase
sync_to_supabase('compliance_cases', {
"meter_id": new_case.meter_id,
"dt_id": new_case.dt_id,
"feeder_id": new_case.feeder_id,
"anomaly_type": new_case.anomaly_type,
"ml_likelihood": new_case.ml_likelihood,
"estimated_leakage_kw": new_case.estimated_leakage_kw,
"status": 'detected',
"notes": new_case.notes,
"created_at": new_case.created_at
})
return jsonify({"status": "success", "message": "Compliance case registered", "case_id": new_case.id})
except Exception as e:
db.session.rollback()
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/api/db/dt/history/<dt_id>')
def get_dt_history(dt_id):
"""Returns the last 24 historical records for a DT"""
records = DTTelemetry.query.filter_by(dt_id=dt_id).order_by(DTTelemetry.timestamp.desc()).limit(24).all()
history = []
for r in records:
history.append({
"timestamp": r.timestamp.isoformat(),
"load_kw": r.master_meter_kw,
"stress_pct": r.stress_pct,
"theft_kw": r.commercial_theft_loss_kw
})
return jsonify({"status": "success", "dt_id": dt_id, "history": history[::-1]})
# --- MAP ENDPOINTS ---
@app.route('/api/map/anomalies')
def get_map_anomalies():
"""Returns comprehensive grid markers (Feeders, DTs, and Meters) for the Leaflet map."""
with _state_lock:
grid = LATEST_GRID_PAYLOAD.copy()
if not grid:
return jsonify({"status": "success", "markers": []})
markers = []
# Extract Feeders
for sub in grid.get("substations", []):
for fdr in sub.get("feeders", []):
markers.append({
"id": fdr["feeder_id"],
"type": "feeder",
"lat": fdr.get("location", {}).get("lat", 12.9716),
"lng": fdr.get("location", {}).get("lng", 77.5946),
"status": "normal", # Feeders are usually aggregating
"label": fdr["feeder_id"]
})
# Extract DTs
for dt in fdr.get("dts", []):
is_dt_anomaly = dt.get("stress_pct", 0) > 85.0
markers.append({
"id": dt["dt_id"],
"type": "dt",
"lat": dt.get("location", {}).get("lat", 12.9716),
"lng": dt.get("location", {}).get("lng", 77.5946),
"status": "critical" if is_dt_anomaly else "normal",
"stress_pct": dt.get("stress_pct", 0),
"label": dt["dt_id"]
})
# Extract Meters
for m in dt.get("meters", []):
is_meter_anomaly = m.get("anomaly_flag", "Normal") != "Normal"
markers.append({
"id": m["meter_id"],
"type": "meter",
"lat": m.get("location", {}).get("lat", 12.9716),
"lng": m.get("location", {}).get("lng", 77.5946),
"status": "critical" if is_meter_anomaly else "normal",
"is_anomaly": is_meter_anomaly,
"anomaly_flag": m.get("anomaly_flag", "Normal"),
"ml_likelihood": m.get("ml_likelihood", 88.5) if is_meter_anomaly else 5.0,
"consumer_type": m.get("consumer_type", "Domestic"),
"active_kw": m.get("measurements", {}).get("active_power_kw", 0),
"power_factor": m.get("measurements", {}).get("power_factor", 0.95),
"tariff_slab": m.get("consumer_tariff_slab", "LT-2"),
"peer_deviation": m.get("peer_cluster_deviation_pct", 12.5),
"magnetic_interference": m.get("magnetic_interference", False),
"cover_open": m.get("cover_open", False),
"label": m["meter_id"]
})
return jsonify({
"status": "success",
"markers": markers,
"timestamp": datetime.now().isoformat()
})
# ==========================================
# GLOBAL SEARCH API
# ==========================================
@app.route('/api/search')
def global_search():
"""Search across meters, DTs, feeders, substations."""
q = request.args.get('q', '').strip().upper()
if not q or len(q) < 2:
return jsonify({"status": "error", "message": "Query too short"}), 400
with _state_lock:
grid = LATEST_GRID_PAYLOAD.copy()
if not grid:
return jsonify({"status": "success", "results": []})
results = []
for sub in grid.get("substations", []):
if q in sub["substation_id"].upper():
results.append({"type": "substation", "id": sub["substation_id"], "label": sub.get("name", sub["substation_id"]), "page": f"station_feeder_dashboard.html?sub={sub['substation_id']}"})
for fdr in sub.get("feeders", []):
if q in fdr["feeder_id"].upper():
results.append({"type": "feeder", "id": fdr["feeder_id"], "label": fdr["feeder_id"], "page": f"demand_forecast.html?feeder={fdr['feeder_id']}"})
for dt in fdr.get("dts", []):
if q in dt["dt_id"].upper():
results.append({"type": "dt", "id": dt["dt_id"], "label": dt["dt_id"], "page": f"dt_meter_dashboard.html?dt={dt['dt_id']}"})
for m in dt.get("meters", []):
if q in m["meter_id"].upper():
results.append({"type": "meter", "id": m["meter_id"], "label": m["meter_id"], "page": f"forensic_audit.html?meter={m['meter_id']}"})
return jsonify({"status": "success", "results": results[:20], "total": len(results)})
# ==========================================
# COMPLIANCE CASE MANAGEMENT
# ==========================================
@app.route('/api/compliance/cases')
def get_compliance_cases():
"""List all compliance cases with optional status filter."""
status_filter = request.args.get('status', None)
query = ComplianceCase.query.order_by(ComplianceCase.created_at.desc())
if status_filter:
query = query.filter_by(status=status_filter)
cases = query.limit(200).all()
return jsonify({"status": "success", "data": [{
"id": c.id, "meter_id": c.meter_id, "dt_id": c.dt_id, "feeder_id": c.feeder_id,
"anomaly_type": c.anomaly_type, "ml_likelihood": c.ml_likelihood,
"estimated_leakage_kw": c.estimated_leakage_kw, "status": c.status,
"fir_id": c.fir_id, "dispatch_id": c.dispatch_id, "officer_id": c.officer_id,
"created_at": c.created_at.isoformat() if c.created_at else None,
"resolved_at": c.resolved_at.isoformat() if c.resolved_at else None,
"notes": c.notes
} for c in cases]})
@app.route('/api/compliance/cases/<int:case_id>', methods=['PATCH'])
def update_compliance_case(case_id):
"""Update compliance case status."""
case = ComplianceCase.query.get_or_404(case_id)
data = request.json or {}
if 'status' in data:
case.status = data['status']
if data['status'] == 'resolved':
case.resolved_at = datetime.now()
if 'fir_id' in data: case.fir_id = data['fir_id']
if 'dispatch_id' in data: case.dispatch_id = data['dispatch_id']
if 'officer_id' in data: case.officer_id = data['officer_id']
if 'notes' in data: case.notes = data['notes']
case.updated_at = datetime.now()
db.session.commit()
return jsonify({"status": "success", "message": "Case updated"})
@app.route('/api/interventions/demand-response', methods=['POST'])
def demand_response():
"""
1-Click Grid Healing: Dispatches SMS nudges to EV owners
and heavy pump users to shift load to off-peak hours.
Currently simulated (mock SMS payload).
"""
data = request.json or {}
zone_id = data.get('zone_id', 'ZONE-DEFAULT')
target_type = data.get('target_type', 'ev') # 'ev' or 'pump'
# Simulate identifying target users via NILM appliance disaggregation
if target_type == 'ev':
user_count = random.randint(80, 200)
shavable_kw = round(user_count * 3.3, 1)
message = f"BESCOM ToD Alert: Save ₹2.40/unit by charging your EV after 11 PM tonight. Grid peak expected 6-9 PM."
else:
user_count = random.randint(40, 120)
shavable_kw = round(user_count * 1.5, 1)
message = f"BESCOM Advisory: Defer water pump usage to post-10 PM for reduced tariff. Peak grid stress in your area."
log_entry = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | DEMAND-RESPONSE | Zone: {zone_id} | Type: {target_type} | Users: {user_count}\n"
try:
log_path = os.path.join(os.path.dirname(__file__), 'activity_log.txt')
with open(log_path, "a") as f:
f.write(log_entry)
except:
pass
print(f"[GRID-HEAL] Dispatched {target_type.upper()} nudge to {user_count} users in {zone_id}")
return jsonify({
"status": "success",
"action": "demand_response",
"details": {
"zone_id": zone_id,
"target_type": target_type,
"users_notified": user_count,
"estimated_peak_shave_kw": shavable_kw,
"sms_message_template": message,
"dispatch_time": datetime.now().isoformat()
}
})
@app.route('/api/interventions/smart-shedding', methods=['POST'])
def smart_shedding():
"""
Micro-Shedding Optimizer: Recommends targeted meter-level
disconnections of non-essential loads while keeping hospitals
and essential services running.
"""
data = request.json or {}
feeder_id = data.get('feeder_id', 'FDR-DEFAULT')
deficit_kw = data.get('deficit_kw', 50.0)
# Simulate the optimizer's output
shed_targets = [
{
"category": "Commercial Signage",
"meter_count": random.randint(15, 40),
"estimated_savings_kw": round(deficit_kw * 0.4, 1),
"status": "RECOMMENDED",
"priority": 1
},
{
"category": "Non-Essential AC (Commercial)",
"meter_count": random.randint(10, 25),
"estimated_savings_kw": round(deficit_kw * 0.35, 1),
"status": "RECOMMENDED",
"priority": 2
},
{
"category": "Street Lighting (Dimming)",
"meter_count": random.randint(20, 60),
"estimated_savings_kw": round(deficit_kw * 0.15, 1),
"status": "OPTIONAL",
"priority": 3
}
]
excluded = [
{"category": "Hospitals & Clinics", "reason": "Category-1 Essential Service"},
{"category": "Grid Substations", "reason": "Critical Infrastructure"},
{"category": "Water Treatment Plants", "reason": "Category-1 Essential Service"}
]
total_savings = sum(t["estimated_savings_kw"] for t in shed_targets)
print(f"[MICRO-SHED] Feeder {feeder_id} | Deficit: {deficit_kw} kW | Recommended savings: {total_savings} kW")
return jsonify({
"status": "success",
"action": "smart_shedding",
"details": {
"feeder_id": feeder_id,
"deficit_kw": deficit_kw,
"shed_targets": shed_targets,
"excluded_services": excluded,
"total_estimated_savings_kw": round(total_savings, 1),
"deficit_covered_pct": round(min(100, (total_savings / deficit_kw) * 100), 1) if deficit_kw > 0 else 0
}
})
# ==========================================
# PART-A: GRID INTELLIGENCE & STRESS ANALYSIS
# ==========================================
@app.route('/api/forecast/part-a')
def get_part_a_forecast():
level = request.args.get('level', 'feeder')
node_id = request.args.get('id', None)
with _state_lock:
grid = LATEST_GRID_PAYLOAD.copy()
if not grid: return jsonify({"status": "loading", "message": "Grid initializing..."}), 200
try:
payload = forecast_engine.generate_part_a_payload(live_grid=grid, level=level, node_id=node_id)
return jsonify({"status": "success", "data": payload})
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/api/analytics/thermal_matrix')
def thermal_matrix():
level = request.args.get('level', 'substation')
node_id = request.args.get('id', None)
with _state_lock:
grid = LATEST_GRID_PAYLOAD.copy()
if not grid: return jsonify([])
matrix = []
try:
# We need to return an array of objects: { label: string, predictions: [{hour: int, stress_pct: float}] }
# Let's use forecast_engine.generate_24h_stress_forecast for each child node
if level == 'substation':
# return all feeders
target_sub = None
for sub in grid.get('substations', []):
if not node_id or sub['substation_id'] == node_id:
target_sub = sub
break
if target_sub:
for fdr in target_sub.get('feeders', []):
f_fc = forecast_engine.generate_24h_stress_forecast(fdr['feeder_id'], live_grid=grid)
preds = [{"hour": p["hour"], "stress_pct": p.get("stress_pct", round(p["p90_load_kw"]/max(1, p["p90_load_kw"])*100, 1))} for p in f_fc]
matrix.append({"label": fdr['feeder_id'], "predictions": preds})
elif level == 'feeder':
# return DTs for feeder
for sub in grid.get('substations', []):
for fdr in sub.get('feeders', []):
if fdr['feeder_id'] == node_id:
for dt in fdr.get('dts', [])[:10]: # limit to 10 DTs for UI
dt_fc = forecast_engine.generate_24h_stress_forecast(fdr['feeder_id'], live_grid=grid)
# Add some random variation for DTs
preds = [{"hour": p["hour"], "stress_pct": min(100, max(0, p.get("stress_pct", 50) + random.randint(-15, 15)))} for p in dt_fc]
matrix.append({"label": dt['dt_id'], "predictions": preds})
break
return jsonify(matrix)
except Exception as e:
print(f"Thermal Matrix Error: {e}")
return jsonify([])
# ==========================================
# PAGE ROUTES (Unified from main_app.py)
# ==========================================
@app.route('/')
def root():
return render_template('index.html')
@app.route('/explorer')
def serve_explorer():
return render_template('explorer.html')
@app.route('/about_project')
def about_project():
return render_template('about_project.html')
# --- Catch-all for components and pages ---
@app.route('/<path:filename>')
def catch_all(filename):
if filename.endswith('.html'):
if os.path.exists(os.path.join(components_dir, filename)):
return render_template(f'components/{filename}')
if os.path.exists(os.path.join(templates_dir, filename)):
return render_template(filename)
else:
# Try as a component name without .html
html_file = filename + '.html'
if os.path.exists(os.path.join(components_dir, html_file)):
return render_template(f'components/{html_file}')
# Fallback to static serving
if os.path.exists(os.path.join(frontend_dir, filename)):
return send_from_directory(frontend_dir, filename)
return "File not found", 404
# ==========================================
# STARTUP
# ==========================================
if __name__ == '__main__':
print("\n" + "="*60)
print(" VIDYUT RAKSHAK: UNIFIED COMMAND CENTER")
print("="*60 + "\n")
# 1. SETUP: Check if Database exists (SKIP GENERATION IN SPACE)
db_path = os.path.join(os.path.dirname(__file__), 'database', 'smart_grid.db')
if not os.path.exists(db_path):
print("[INIT] Database not found. Skipping generation for standalone mode.")
# subprocess.run([sys.executable, os.path.join(os.path.dirname(__file__), 'generate_dataset.py')], check=True)
# 2. TRAINING: Check if Models exist (SKIP TRAINING IN SPACE)
# models_dir = os.path.join(os.path.dirname(__file__), 'models')
# if not os.path.exists(os.path.join(models_dir, 'rf_classifier.pkl')):
# print("[INIT] Intelligence missing: ML Models not found. Using remote inference fallback.")
# # subprocess.run([sys.executable, os.path.join(os.path.dirname(__file__), 'train_model.py')], check=True)
# 3. BACKGROUND SERVICES
threading.Thread(target=background_data_generator, daemon=True).start()
threading.Thread(target=background_iot_worker, daemon=True).start()
threading.Thread(target=background_forecast_worker, daemon=True).start()
# 4. LAUNCH
port = int(os.environ.get("PORT", 8000))
print(f"\n[SERVER] Vidyut Raksha Platform online at: http://127.0.0.1:{port}")
print(f"[SERVER] Mode: Unified (Frontend + Backend)")
print("\n" + "-"*60)
app.run(host='0.0.0.0', port=port, debug=False, use_reloader=False)