LiamKhoaLe commited on
Commit
0a6246a
·
1 Parent(s): d5d7292

Update comments

Browse files
Files changed (1) hide show
  1. app.py +9 -6
app.py CHANGED
@@ -85,7 +85,7 @@ def on_message(client, userdata, msg):
85
  logger.error(f"❌ Failed to write checkpoint: {e}")
86
 
87
  # ───────────── PIPELINE ─────────────
88
- ## Filter
89
  def parse_and_filter(raw_rows):
90
  rows = []
91
  for r in raw_rows:
@@ -130,19 +130,20 @@ def fill_missing(df):
130
  df = pd.DataFrame(rows).sort_values("timestamp")
131
  df["consume_clean"] = df["consume"]
132
  df.loc[(df["consume"] < 0) | (df["consume"].diff() < 0), "consume_clean"] = np.nan
133
-
134
  imputer = KNNImputer(n_neighbors=3)
135
  df[["voltage", "current", "power"]] = imputer.fit_transform(df[["voltage", "current", "power"]])
136
-
137
  train = df[df["consume_clean"].notna()]
138
  pred = df[df["consume_clean"].isna()]
139
  if not train.empty and not pred.empty:
140
  model = LinearRegression().fit(train[["voltage", "current", "power"]], train["consume_clean"])
141
  df.loc[pred.index, "consume_clean"] = model.predict(pred[["voltage", "current", "power"]])
142
  df["consume"] = df["consume_clean"]
 
143
  return df.drop(columns=["consume_clean"])
144
 
145
- ## Final MongoDB Saver
146
  def insert_mongo(df):
147
  if df.empty: return
148
  try:
@@ -159,7 +160,7 @@ def insert_mongo(df):
159
  except Exception as e:
160
  logger.error(f"❌ Mongo insert error: {e}")
161
 
162
- ## Batch worker looper
163
  def batch_worker():
164
  while not stop_event.is_set():
165
  time.sleep(BATCH_SECONDS)
@@ -210,6 +211,7 @@ def health():
210
 
211
  # ─────── BOOTSTRAP ───────
212
  def mqtt_main():
 
213
  threading.Thread(target=batch_worker, daemon=True).start()
214
  client = mqtt.Client()
215
  client.username_pw_set(USERNAME, PASSWORD)
@@ -218,6 +220,7 @@ def mqtt_main():
218
  client.connect(BROKER, PORT, 60)
219
  client.loop_forever()
220
 
 
221
  if __name__ == "__main__":
222
  # Set signal handlers in main thread
223
  def handle_exit(sig, _):
@@ -225,7 +228,7 @@ if __name__ == "__main__":
225
  stop_event.set()
226
  for s in [signal.SIGINT, signal.SIGTERM]:
227
  signal.signal(s, handle_exit)
228
-
229
  threading.Thread(target=mqtt_main, daemon=True).start()
230
  uvicorn.run(app, host="0.0.0.0", port=7860)
231
 
 
85
  logger.error(f"❌ Failed to write checkpoint: {e}")
86
 
87
  # ───────────── PIPELINE ─────────────
88
+ ## Filter and parsing payload to 4 individual variables
89
  def parse_and_filter(raw_rows):
90
  rows = []
91
  for r in raw_rows:
 
130
  df = pd.DataFrame(rows).sort_values("timestamp")
131
  df["consume_clean"] = df["consume"]
132
  df.loc[(df["consume"] < 0) | (df["consume"].diff() < 0), "consume_clean"] = np.nan
133
+ # Using KNNImputer to fit missing target data using 3 other variables
134
  imputer = KNNImputer(n_neighbors=3)
135
  df[["voltage", "current", "power"]] = imputer.fit_transform(df[["voltage", "current", "power"]])
136
+ # Using LinearRegression to fit missing target data using 3 other variables
137
  train = df[df["consume_clean"].notna()]
138
  pred = df[df["consume_clean"].isna()]
139
  if not train.empty and not pred.empty:
140
  model = LinearRegression().fit(train[["voltage", "current", "power"]], train["consume_clean"])
141
  df.loc[pred.index, "consume_clean"] = model.predict(pred[["voltage", "current", "power"]])
142
  df["consume"] = df["consume_clean"]
143
+ logger.info("🧹 Handle missing function proceed")
144
  return df.drop(columns=["consume_clean"])
145
 
146
+ ## MongoDB insertion
147
  def insert_mongo(df):
148
  if df.empty: return
149
  try:
 
160
  except Exception as e:
161
  logger.error(f"❌ Mongo insert error: {e}")
162
 
163
+ ## Batch worker to insert data to MongoDB
164
  def batch_worker():
165
  while not stop_event.is_set():
166
  time.sleep(BATCH_SECONDS)
 
211
 
212
  # ─────── BOOTSTRAP ───────
213
  def mqtt_main():
214
+ # MQTT broker ingestion
215
  threading.Thread(target=batch_worker, daemon=True).start()
216
  client = mqtt.Client()
217
  client.username_pw_set(USERNAME, PASSWORD)
 
220
  client.connect(BROKER, PORT, 60)
221
  client.loop_forever()
222
 
223
+ # Handle parallel threads
224
  if __name__ == "__main__":
225
  # Set signal handlers in main thread
226
  def handle_exit(sig, _):
 
228
  stop_event.set()
229
  for s in [signal.SIGINT, signal.SIGTERM]:
230
  signal.signal(s, handle_exit)
231
+ # Handle data ingestion from MQTT broker
232
  threading.Thread(target=mqtt_main, daemon=True).start()
233
  uvicorn.run(app, host="0.0.0.0", port=7860)
234