LiamKhoaLe commited on
Commit
b8524f4
·
1 Parent(s): e998d00

Upd service to fetch and save to db on real-time, create a 4h session window to benchmark against newly filled data

Browse files
Files changed (1) hide show
  1. app.py +129 -124
app.py CHANGED
@@ -7,7 +7,7 @@
7
 
8
  import os, json, signal, logging, threading, time
9
  from datetime import datetime, timedelta
10
- from queue import Queue, Empty
11
 
12
  import paho.mqtt.client as mqtt
13
  from dotenv import load_dotenv
@@ -34,180 +34,186 @@ MONGO_DB = os.getenv("MONGO_DB", "poptech")
34
  MONGO_COL = os.getenv("MONGO_COLLECTION", "device_clean")
35
  FETCH_PASS = os.getenv("FETCH_PASSWORD")
36
 
37
- BATCH_SECONDS = int(os.getenv("WINDOW_SECONDS", 1800))
38
  EXPECTED_INTERVAL_SEC = int(os.getenv("EXPECTED_INTERVAL_SEC", 30))
39
- TOLERANCE_SEC = int(os.getenv("TOLERANCE_SEC", 10))
40
- RAW_CHECKPOINT_PATH = os.getenv("RAW_CHECKPOINT_PATH", "cache/checkpoint_raw.csv")
41
- EXPORT_CSV_PATH = "mongo_cleaned_export.csv"
42
 
 
 
43
  os.makedirs(os.path.dirname(RAW_CHECKPOINT_PATH), exist_ok=True)
44
 
45
- # ─────── LOGGING ───────
46
  logging.basicConfig(
47
- level=logging.DEBUG,
48
  format="%(asctime)s — %(name)s — %(levelname)s — %(message)s",
49
  force=True
50
  )
51
  logger = logging.getLogger("poptech-cleaner")
52
- for m in ["pymongo", "pymongo.server_selection", "pymongo.topology", "pymongo.connection"]:
53
- logging.getLogger(m).setLevel(logging.WARNING)
54
- logger.info("🚀 PopTech FastAPI Cleaning Server starting...")
55
 
56
- # ──────────── GLOBALS ─────────────────
57
- queue_raw = Queue()
58
- stop_event = threading.Event()
59
- app = FastAPI()
 
60
 
61
- # ─────────── MQTT CALLBACKS ───────────
62
- def on_connect(client, userdata, flags, rc):
63
- if rc == 0:
64
- logger.info("✅ Connected to MQTT broker")
65
- client.subscribe(MQTT_TOPIC)
66
- else:
67
- logger.error(f"❌ MQTT connection failed: {rc}")
68
 
69
- # DEBUG MESSENGER & CHECKPOINT WRITER ─
70
- def on_message(client, userdata, msg):
71
- ts = datetime.utcnow().isoformat()
72
- payload = msg.payload.decode(errors="replace")
73
- queue_raw.put({"timestamp": ts, "topic": msg.topic, "payload": payload})
74
- # Clean out spaces
75
  try:
76
- data = json.loads(payload.replace('""', '"')).get("data", [])
77
- logger.info(f"📩 MQTT: {ts} | V={data[0] if len(data)>0 else None}V, A={data[1] if len(data)>1 else None}A, W={data[2] if len(data)>2 else None}W, mWh={data[3] if len(data)>3 else None}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78
  except Exception:
79
- pass
80
- # Return as compact of ts, topic and payload at this stage
 
 
 
 
81
  try:
82
- with open(RAW_CHECKPOINT_PATH, "a", encoding="utf-8") as f:
83
- f.write(f'{ts},{msg.topic},"{payload}"\n')
 
 
 
84
  except Exception as e:
85
- logger.error(f"❌ Failed to write checkpoint: {e}")
86
-
87
- # ───────────── PIPELINE ─────────────
88
- ## Filter and parsing payload to 4 individual variables
89
- def parse_and_filter(raw_rows):
90
- rows = []
91
- for r in raw_rows:
92
- try:
93
- payload = json.loads(r["payload"].replace('""', '"'))
94
- if r["topic"].startswith("device/socket/reply/") and isinstance(payload.get("data", []), list):
95
- v, a, w, c = (payload["data"] + [None]*4)[:4]
96
- if any(x not in (0, None) for x in (a, w, c)):
97
- rows.append({
98
- "timestamp": r["timestamp"],
99
- "id": payload.get("id"),
100
- "imei": payload.get("imei"),
101
- "type": payload.get("type"),
102
- "voltage": float(v),
103
- "current": float(a),
104
- "power": float(w),
105
- "consume": float(c),
106
- })
107
- except:
108
- continue
109
- return pd.DataFrame(rows)
110
 
111
- ## Detect and fill missing
112
- def fill_missing(df):
113
  if df.empty:
114
  return df
115
- # --- Chuẩn hoá thời gian ---
116
  df["timestamp"] = pd.to_datetime(df["timestamp"])
117
  df.sort_values("timestamp", inplace=True)
118
- # Time interval limitation
119
  expected = timedelta(seconds=EXPECTED_INTERVAL_SEC)
120
- tol = timedelta(seconds=TOLERANCE_SEC)
121
- # --- B1. Chèn bản ghi bị rơi ---
122
  rows = [df.iloc[0]]
123
  for i in range(1, len(df)):
124
- prev, curr = df.iloc[i - 1]["timestamp"], df.iloc[i]["timestamp"]
125
  rows.append(df.iloc[i])
126
  if curr - prev > expected + tol:
127
  for j in range(1, int(round((curr - prev) / expected))):
128
  gap_ts = prev + j * expected
129
- gap = df.iloc[i - 1].copy()
130
  gap["timestamp"] = gap_ts
131
  for col in ["voltage", "current", "power", "consume"]:
132
  gap[col] = np.nan
133
  rows.insert(-1, gap)
134
- # Sorting with ts to be identifier
135
  df = pd.DataFrame(rows).sort_values("timestamp").reset_index(drop=True)
136
  df["consume_clean"] = df["consume"]
137
  df.loc[(df["consume"] < 0) | (df["consume"].diff() < 0), "consume_clean"] = np.nan
138
- # --- B2. Impute feature ---
139
- imputer = KNNImputer(n_neighbors=3)
140
- df[["voltage", "current", "power"]] = imputer.fit_transform(
141
- df[["voltage", "current", "power"]]
142
- )
143
- # --- B3. Model chính ---
144
  train = df[df["consume_clean"].notna()]
145
  pred = df[df["consume_clean"].isna()]
146
- # NaN and null not valid
147
  if not train.empty and not pred.empty:
148
- model = LinearRegression().fit(
149
- train[["voltage", "current", "power"]],
150
- train["consume_clean"]
151
- )
152
  try:
153
- y_hat = model.predict(pred[["voltage", "current", "power"]])
154
- # Khớp index bằng Series (an toàn với duplicate)
155
  df.loc[pred.index, "consume_clean"] = pd.Series(y_hat, index=pred.index)
156
  except Exception as e:
157
- logger.warning(f"⚠️ Primary model failed partially: {e}")
158
- # --- B4. Fallback theo timestamp ---
159
- still_missing = df[df["consume_clean"].isna()]
160
- if not still_missing.empty:
161
- logger.warning(f"⚠️ {len(still_missing)} rows still missing. Using timestamp fallback.")
162
  df["ts_sec"] = (df["timestamp"] - df["timestamp"].min()).dt.total_seconds()
163
  fb_train = df[df["consume_clean"].notna()]
164
  fb_pred = df[df["consume_clean"].isna()]
165
- # Chỉ lấy bản ghi có ts_sec hợp lệ & index duy nhất
166
- fb_pred = fb_pred[fb_pred["ts_sec"].notna()].drop_duplicates(subset="timestamp")
167
  if not fb_train.empty and not fb_pred.empty:
168
- fb_model = LinearRegression().fit(
169
- fb_train[["ts_sec"]], fb_train["consume_clean"]
170
- )
171
  y_fb = fb_model.predict(fb_pred[["ts_sec"]])
172
  df.loc[fb_pred.index, "consume_clean"] = pd.Series(y_fb, index=fb_pred.index)
173
- # Drop total sec temp var
174
  df.drop(columns=["ts_sec"], inplace=True)
175
- # --- Kết quả cuối ---
176
  df["consume"] = df["consume_clean"]
177
- logger.info("🧹 fill_missing() hoàn tất làm sạch & khôi phục.")
 
 
178
  return df.drop(columns=["consume_clean"])
179
 
180
- ## MongoDB insertion
181
- def insert_mongo(df):
182
- if df.empty: return
183
- try:
184
- client = MongoClient(MONGO_URI)
185
- col = client[MONGO_DB][MONGO_COL]
186
- col.create_index("timestamp", unique=True)
187
- records = df.to_dict("records")
188
- for r in records: r["_id"] = r["timestamp"]
189
- operations = [
190
- UpdateOne({"_id": r["_id"]}, {"$set": r}, upsert=True) for r in records
191
- ]
192
- col.bulk_write(operations, ordered=False)
193
- logger.info(f"🪣 Inserted {len(records)} rows to MongoDB.")
194
- except Exception as e:
195
- logger.error(f"❌ Mongo insert error: {e}")
 
 
 
 
 
 
 
 
 
 
196
 
197
- ## Batch worker to insert data to MongoDB
198
- def batch_worker():
199
  while not stop_event.is_set():
200
- time.sleep(BATCH_SECONDS)
201
- bundle = []
202
- while True:
203
- try: bundle.append(queue_raw.get_nowait())
204
- except Empty: break
205
- if not bundle:
206
- logger.debug("⏱️ No new data this cycle")
 
 
 
 
207
  continue
208
- logger.info("Start cleaning 🧹")
209
- df_clean = fill_missing(parse_and_filter(bundle))
210
- insert_mongo(df_clean)
 
 
 
 
 
 
211
 
212
  # ─────── FASTAPI ENDPOINTS ───────
213
  @app.get("/fetch")
@@ -246,8 +252,6 @@ def health():
246
 
247
  # ─────── BOOTSTRAP ───────
248
  def mqtt_main():
249
- # MQTT broker ingestion
250
- threading.Thread(target=batch_worker, daemon=True).start()
251
  client = mqtt.Client()
252
  client.username_pw_set(USERNAME, PASSWORD)
253
  client.on_connect = on_connect
@@ -263,7 +267,8 @@ if __name__ == "__main__":
263
  stop_event.set()
264
  for s in [signal.SIGINT, signal.SIGTERM]:
265
  signal.signal(s, handle_exit)
266
- # Handle data ingestion from MQTT broker
 
267
  threading.Thread(target=mqtt_main, daemon=True).start()
268
  uvicorn.run(app, host="0.0.0.0", port=7860)
269
 
 
7
 
8
  import os, json, signal, logging, threading, time
9
  from datetime import datetime, timedelta
10
+ from collections import deque
11
 
12
  import paho.mqtt.client as mqtt
13
  from dotenv import load_dotenv
 
34
  MONGO_COL = os.getenv("MONGO_COLLECTION", "device_clean")
35
  FETCH_PASS = os.getenv("FETCH_PASSWORD")
36
 
37
+ # Tham số xử lý (thời gian)
38
  EXPECTED_INTERVAL_SEC = int(os.getenv("EXPECTED_INTERVAL_SEC", 30))
39
+ TOLERANCE_SEC = int(os.getenv("TOLERANCE_SEC", 10))
40
+ BUFFER_SECONDS = int(os.getenv("BUFFER_SECONDS", 4 * 3600)) # 4 giờ
41
+ BACKFILL_INTERVAL = int(os.getenv("BACKFILL_INTERVAL", 10)) # 10 giây
42
 
43
+ RAW_CHECKPOINT_PATH = os.getenv("RAW_CHECKPOINT_PATH", "cache/checkpoint_raw.csv")
44
+ EXPORT_CSV_PATH = "mongo_cleaned_export.csv"
45
  os.makedirs(os.path.dirname(RAW_CHECKPOINT_PATH), exist_ok=True)
46
 
47
+ # ─────────────── LOGGING ───────────────
48
  logging.basicConfig(
49
+ level=logging.INFO,
50
  format="%(asctime)s — %(name)s — %(levelname)s — %(message)s",
51
  force=True
52
  )
53
  logger = logging.getLogger("poptech-cleaner")
 
 
 
54
 
55
+ # ─────────────── GLOBALS ───────────────
56
+ win_len = BUFFER_SECONDS // EXPECTED_INTERVAL_SEC + 200
57
+ window = deque(maxlen=win_len) # lưu 4 giờ gần nhất
58
+ stop_event = threading.Event()
59
+ app = FastAPI()
60
 
61
+ # ─────────────── UTILITIES ───────────────
62
+ # Đảm bảo giá trị là float, nếu không flag NaN
63
+ def safe_float(x):
64
+ try: return float(x)
65
+ except: return np.nan
 
 
66
 
67
+ def parse_row(ts: str, topic: str, payload: str):
68
+ """Trả về dict đã parse hoặc None nếu không hợp lệ."""
 
 
 
 
69
  try:
70
+ j = json.loads(payload.replace('""', '"'))
71
+ if not topic.startswith("device/socket/reply/"):
72
+ return None
73
+ if not isinstance(j.get("data", []), list):
74
+ return None
75
+ v, a, w, c = (j["data"] + [None] * 4)[:4]
76
+ # bỏ frame idle (all 0)
77
+ if all(x in (0, None) for x in (a, w, c)):
78
+ return None
79
+ return {
80
+ "timestamp": ts,
81
+ "id": j.get("id"),
82
+ "imei": j.get("imei"),
83
+ "type": j.get("type"),
84
+ "voltage": safe_float(v),
85
+ "current": safe_float(a),
86
+ "power": safe_float(w),
87
+ "consume": safe_float(c)
88
+ }
89
  except Exception:
90
+ return None
91
+
92
+ # Tải dữ liệu mới lên DB
93
+ def upsert_mongo(docs):
94
+ if not docs:
95
+ return
96
  try:
97
+ client = MongoClient(MONGO_URI)
98
+ col = client[MONGO_DB][MONGO_COL]
99
+ col.create_index("timestamp", unique=True)
100
+ ops = [UpdateOne({"_id": d["timestamp"]}, {"$set": d}, upsert=True) for d in docs]
101
+ col.bulk_write(ops, ordered=False)
102
  except Exception as e:
103
+ logger.error(f"❌ Mongo error: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
104
 
105
+ # Chèn giá trị tổng thể
106
+ def fill_missing(df: pd.DataFrame) -> pd.DataFrame:
107
  if df.empty:
108
  return df
 
109
  df["timestamp"] = pd.to_datetime(df["timestamp"])
110
  df.sort_values("timestamp", inplace=True)
111
+ # Tổng thời gian dự kiến giữa session
112
  expected = timedelta(seconds=EXPECTED_INTERVAL_SEC)
113
+ tol = timedelta(seconds=TOLERANCE_SEC)
114
+ # Lọc lỗi trống
115
  rows = [df.iloc[0]]
116
  for i in range(1, len(df)):
117
+ prev, curr = df.iloc[i-1]["timestamp"], df.iloc[i]["timestamp"]
118
  rows.append(df.iloc[i])
119
  if curr - prev > expected + tol:
120
  for j in range(1, int(round((curr - prev) / expected))):
121
  gap_ts = prev + j * expected
122
+ gap = df.iloc[i-1].copy()
123
  gap["timestamp"] = gap_ts
124
  for col in ["voltage", "current", "power", "consume"]:
125
  gap[col] = np.nan
126
  rows.insert(-1, gap)
127
+ # Sort với ts identifier
128
  df = pd.DataFrame(rows).sort_values("timestamp").reset_index(drop=True)
129
  df["consume_clean"] = df["consume"]
130
  df.loc[(df["consume"] < 0) | (df["consume"].diff() < 0), "consume_clean"] = np.nan
131
+ # Impute 3 giá trị còn lại với KNNImputer
132
+ non_missing = df[["voltage","current","power"]].dropna().shape[0]
133
+ k = min(3, max(1, non_missing))
134
+ imputer = KNNImputer(n_neighbors=k)
135
+ df[["voltage", "current", "power"]] = imputer.fit_transform(df[["voltage", "current", "power"]])
136
+ # Train pred fit với LinearRegression
137
  train = df[df["consume_clean"].notna()]
138
  pred = df[df["consume_clean"].isna()]
 
139
  if not train.empty and not pred.empty:
140
+ model = LinearRegression().fit(train[["voltage","current","power"]], train["consume_clean"])
 
 
 
141
  try:
142
+ y_hat = model.predict(pred[["voltage","current","power"]])
 
143
  df.loc[pred.index, "consume_clean"] = pd.Series(y_hat, index=pred.index)
144
  except Exception as e:
145
+ logger.warning(f"⚠️ Primary model error: {e}")
146
+ # Nếu còn giá trị trống sau bộ lọc đầu, tái sd LinearRegression và dự đoán trên ts + tổng tg giữa session
147
+ still = df[df["consume_clean"].isna()]
148
+ if not still.empty:
149
+ logger.warning(f"⚠️ {len(still)} rows still missing timestamp fallback")
150
  df["ts_sec"] = (df["timestamp"] - df["timestamp"].min()).dt.total_seconds()
151
  fb_train = df[df["consume_clean"].notna()]
152
  fb_pred = df[df["consume_clean"].isna()]
153
+ fb_pred = fb_pred[fb_pred["ts_sec"].notna()].drop_duplicates(subset="timestamp")
 
154
  if not fb_train.empty and not fb_pred.empty:
155
+ fb_model = LinearRegression().fit(fb_train[["ts_sec"]], fb_train["consume_clean"])
 
 
156
  y_fb = fb_model.predict(fb_pred[["ts_sec"]])
157
  df.loc[fb_pred.index, "consume_clean"] = pd.Series(y_fb, index=fb_pred.index)
 
158
  df.drop(columns=["ts_sec"], inplace=True)
159
+ # Giá trị cuối và thải giá trị thừa
160
  df["consume"] = df["consume_clean"]
161
+ # Đánh dấu những bản ghi vẫn còn thiếu consume
162
+ # Khi hàm trả về, mỗi dòng sẽ có need_backfill = True/False.
163
+ df.loc[:, "need_backfill"] = df["consume"].isna()
164
  return df.drop(columns=["consume_clean"])
165
 
166
+ # ───────────── MQTT CALLBACKS ─────────────
167
+ def on_connect(client, userdata, flags, rc):
168
+ if rc == 0:
169
+ logger.info("✅ MQTT connected")
170
+ client.subscribe(MQTT_TOPIC)
171
+ else:
172
+ logger.error(f"❌ MQTT connect failed: {rc}")
173
+
174
+ # Pipe chính debug
175
+ def on_message(client, userdata, msg):
176
+ ts = datetime.utcnow().isoformat()
177
+ payload = msg.payload.decode(errors="replace")
178
+ with open(RAW_CHECKPOINT_PATH,"a",encoding="utf-8") as f:
179
+ f.write(f"{ts},{msg.topic},\"{payload}\"\n")
180
+ row = parse_row(ts,msg.topic,payload)
181
+ if row is None: return
182
+ # Ghép vào cửa sổ và fill ngay
183
+ df_win = pd.DataFrame(window)
184
+ df_new = pd.concat([df_win, pd.DataFrame([row])], ignore_index=True)
185
+ df_filled = fill_missing(df_new.tail(2)) # chỉ cần bản ghi trước & mới
186
+ row_clean = df_filled.tail(1).to_dict("records")[0]
187
+ row_clean["need_backfill"] = pd.isna(row_clean["consume"])
188
+ # Gắn giá trị clean vào window session
189
+ window.append(row_clean)
190
+ upsert_mongo([row_clean])
191
+ logger.info(f"📥 Stored row {row_clean['timestamp']}")
192
 
193
+ # ───────────── BACK-FILL WORKER ─────────────
194
+ def backfill_worker():
195
  while not stop_event.is_set():
196
+ time.sleep(BACKFILL_INTERVAL)
197
+ df_win = pd.DataFrame(window)
198
+ pending_mask = df_win["need_backfill"]
199
+ if not pending_mask.any():
200
+ continue
201
+ idxs = df_win[pending_mask].index
202
+ cols = ["voltage", "current", "power"]
203
+ imputer = KNNImputer(n_neighbors=3)
204
+ df_win[cols] = imputer.fit_transform(df_win[cols])
205
+ train = df_win[~pending_mask]
206
+ if train.empty:
207
  continue
208
+ model = LinearRegression().fit(train[cols], train["consume"])
209
+ df_win.loc[idxs, "consume"] = model.predict(df_win.loc[idxs, cols])
210
+ df_win.loc[idxs, "need_backfill"] = False
211
+ # update deque
212
+ for i in idxs:
213
+ window[i].update(df_win.loc[i].to_dict())
214
+ # Upload and merge current on Mongo
215
+ upsert_mongo([window[i] for i in idxs])
216
+ logger.info(f"🔄 Back-filled {len(idxs)} rows")
217
 
218
  # ─────── FASTAPI ENDPOINTS ───────
219
  @app.get("/fetch")
 
252
 
253
  # ─────── BOOTSTRAP ───────
254
  def mqtt_main():
 
 
255
  client = mqtt.Client()
256
  client.username_pw_set(USERNAME, PASSWORD)
257
  client.on_connect = on_connect
 
267
  stop_event.set()
268
  for s in [signal.SIGINT, signal.SIGTERM]:
269
  signal.signal(s, handle_exit)
270
+ # Handle data ingestion from MQTT broker, and backfiller
271
+ threading.Thread(target=backfill_worker, daemon=True).start() # quét back-fill 10s/lần
272
  threading.Thread(target=mqtt_main, daemon=True).start()
273
  uvicorn.run(app, host="0.0.0.0", port=7860)
274