# app.py # Root API: https://binkhoale1812-poptech-cleaner.hf.space/ # Usages: ## https://binkhoale1812-poptech-cleaner.hf.space/fetch?Password=... ## https://binkhoale1812-poptech-cleaner.hf.space/load?Password=... ## https://binkhoale1812-poptech-cleaner.hf.space/delete?Password=... import os, json, signal, logging, threading, time from datetime import datetime, timedelta from collections import deque import paho.mqtt.client as mqtt from dotenv import load_dotenv import pandas as pd import numpy as np from sklearn.impute import KNNImputer from sklearn.linear_model import LinearRegression from pymongo import MongoClient, UpdateOne from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse, FileResponse from fastapi.encoders import jsonable_encoder import uvicorn # ─────── ENV CONFIG ─────── load_dotenv() BROKER = os.getenv("BROKER") PORT = int(os.getenv("PORT", 1883)) USERNAME = os.getenv("USERNAME") PASSWORD = os.getenv("PASSWORD") MQTT_TOPIC = os.getenv("MQTT_TOPIC", "device/socket/reply/#") MONGO_URI = os.getenv("MONGO_URI") MONGO_DB = os.getenv("MONGO_DB", "poptech") MONGO_COL = os.getenv("MONGO_COLLECTION", "device_clean") FETCH_PASS = os.getenv("FETCH_PASSWORD") # Tham số xử lý (thời gian) EXPECTED_INTERVAL_SEC = int(os.getenv("EXPECTED_INTERVAL_SEC", 30)) TOLERANCE_SEC = int(os.getenv("TOLERANCE_SEC", 10)) BUFFER_SECONDS = int(os.getenv("BUFFER_SECONDS", 4 * 3600)) # 4 giờ BACKFILL_INTERVAL = int(os.getenv("BACKFILL_INTERVAL", 10)) # 10 giây EXPORT_CSV_PATH = "mongo_cleaned_export.csv" # ─────────────── LOGGING ─────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s — %(name)s — %(levelname)s — %(message)s", force=True ) logger = logging.getLogger("poptech-cleaner") # ─────────────── GLOBALS ─────────────── win_len = BUFFER_SECONDS // EXPECTED_INTERVAL_SEC + 200 window = deque(maxlen=win_len) # lưu 4 giờ gần nhất stop_event = threading.Event() app = FastAPI() # ─────────────── UTILITIES ─────────────── # Đảm bảo giá trị là float, nếu không flag NaN def safe_float(x): try: return float(x) except: return np.nan def parse_row(ts: str, topic: str, payload: str): """Trả về dict đã parse hoặc None nếu không hợp lệ.""" try: j = json.loads(payload.replace('""', '"')) if not topic.startswith("device/socket/reply/"): return None if not isinstance(j.get("data", []), list): return None v, a, w, c = (j["data"] + [None] * 4)[:4] logger.info(f"📩 MQTT: {ts} | V={v}V, A={a}A, W={w}W, mWh={c}") # bỏ frame idle (all 0) if all(x in (0, None) for x in (a, w, c)): return None return { "timestamp": ts, "id": j.get("id"), "imei": j.get("imei"), "type": j.get("type"), "voltage": safe_float(v), "current": safe_float(a), "power": safe_float(w), "consume": safe_float(c) } except Exception: return None # Tải dữ liệu mới lên DB def upsert_mongo(docs): if not docs: return try: client = MongoClient(MONGO_URI) col = client[MONGO_DB][MONGO_COL] col.create_index("timestamp", unique=True) ops = [UpdateOne({"_id": d["timestamp"]}, {"$set": d}, upsert=True) for d in docs] col.bulk_write(ops, ordered=False) logger.info(f"🪣 Inserted to MongoDB.") except Exception as e: logger.error(f"❌ Mongo error: {e}") # Chèn giá trị tổng thể def fill_missing(df: pd.DataFrame) -> pd.DataFrame: if df.empty: return df df = df.copy() df["timestamp"] = pd.to_datetime(df["timestamp"]) df = df.sort_values("timestamp").reset_index(drop=True) # Tổng thời gian dự kiến giữa session expected = timedelta(seconds=EXPECTED_INTERVAL_SEC) tol = timedelta(seconds=TOLERANCE_SEC) # Lọc lỗi và trống rows = [df.iloc[0]] for i in range(1, len(df)): prev, curr = df.iloc[i-1]["timestamp"], df.iloc[i]["timestamp"] rows.append(df.iloc[i]) if curr - prev > expected + tol: for j in range(1, int(round((curr - prev) / expected))): gap_ts = prev + j * expected gap = df.iloc[i-1].copy() gap["timestamp"] = gap_ts for col in ["voltage", "current", "power", "consume"]: gap[col] = np.nan rows.insert(-1, gap) # Sort với ts là identifier df = pd.DataFrame(rows).sort_values("timestamp").reset_index(drop=True) df["consume_clean"] = df["consume"] df.loc[(df["consume"] < 0) | (df["consume"].diff() < 0), "consume_clean"] = np.nan # Impute 3 giá trị còn lại với KNNImputer non_missing = df[["voltage","current","power"]].dropna().shape[0] k = min(3, max(1, non_missing)) imputer = KNNImputer(n_neighbors=k) df[["voltage", "current", "power"]] = imputer.fit_transform(df[["voltage", "current", "power"]]) # Train và pred fit với LinearRegression train = df[df["consume_clean"].notna()] pred = df[df["consume_clean"].isna()] if not train.empty and not pred.empty: logger.info("Start cleaning 🧹") model = LinearRegression().fit(train[["voltage","current","power"]], train["consume_clean"]) try: y_hat = model.predict(pred[["voltage","current","power"]]) df.loc[pred.index, "consume_clean"] = pd.Series(y_hat, index=pred.index) except Exception as e: logger.warning(f"⚠️ Primary model error: {e}") # Nếu còn giá trị trống sau bộ lọc đầu, tái sd LinearRegression và dự đoán trên ts + tổng tg giữa session still = df[df["consume_clean"].isna()] if not still.empty: logger.warning(f"⚠️ {len(still)} rows still missing → timestamp fallback") df["ts_sec"] = (df["timestamp"] - df["timestamp"].min()).dt.total_seconds() fb_train = df[df["consume_clean"].notna()] fb_pred = df[df["consume_clean"].isna()] fb_pred = fb_pred[fb_pred["ts_sec"].notna()].drop_duplicates(subset="timestamp") if not fb_train.empty and not fb_pred.empty: fb_model = LinearRegression().fit(fb_train[["ts_sec"]], fb_train["consume_clean"]) y_fb = fb_model.predict(fb_pred[["ts_sec"]]) df.loc[fb_pred.index, "consume_clean"] = pd.Series(y_fb, index=fb_pred.index) df.drop(columns=["ts_sec"], inplace=True) logger.info("Finish cleaning 🧹") # Giá trị cuối và thải giá trị thừa df["consume"] = df["consume_clean"] # Đánh dấu những bản ghi vẫn còn thiếu consume # Khi hàm trả về, mỗi dòng sẽ có need_backfill = True/False. df.loc[:, "need_backfill"] = df["consume"].isna() return df.drop(columns=["consume_clean"]) # ───────────── MQTT CALLBACKS ───────────── def on_connect(client, userdata, flags, rc): if rc == 0: logger.info("✅ MQTT connected") client.subscribe(MQTT_TOPIC) else: logger.error(f"❌ MQTT connect failed: {rc}") # Pipe chính và debug def on_message(client, userdata, msg): ts = datetime.utcnow().isoformat() payload = msg.payload.decode(errors="replace") row = parse_row(ts,msg.topic,payload) if row is None: return # Ghép vào cửa sổ và fill ngay df_win = pd.DataFrame(window) df_new = pd.concat([df_win, pd.DataFrame([row])], ignore_index=True) df_filled = fill_missing(df_new.tail(2)) # chỉ cần bản ghi trước & mới row_clean = df_filled.tail(1).to_dict("records")[0] row_clean["need_backfill"] = pd.isna(row_clean["consume"]) # Gắn giá trị clean vào window session window.append(row_clean) upsert_mongo([row_clean]) logger.info(f"📥 Stored row {row_clean['timestamp']}") # ───────────── BACK-FILL WORKER ───────────── def backfill_worker(): while not stop_event.is_set(): time.sleep(BACKFILL_INTERVAL) df_win = pd.DataFrame(window) if "need_backfill" not in df_win.columns: continue # chưa có dữ liệu đã qua fill_missing() pending_mask = df_win["need_backfill"] if not pending_mask.any(): continue idxs = df_win[pending_mask].index cols = ["voltage", "current", "power"] imputer = KNNImputer(n_neighbors=3) df_win[cols] = imputer.fit_transform(df_win[cols]) train = df_win[~pending_mask] if train.empty: continue model = LinearRegression().fit(train[cols], train["consume"]) df_win.loc[idxs, "consume"] = model.predict(df_win.loc[idxs, cols]) df_win.loc[idxs, "need_backfill"] = False # update deque for i in idxs: window[i].update(df_win.loc[i].to_dict()) # Upload and merge current on Mongo upsert_mongo([window[i] for i in idxs]) logger.info(f"🔄 Back-filled {len(idxs)} rows") # ─────── FASTAPI ENDPOINTS ─────── @app.get("/fetch") def fetch(Password: str): if Password != FETCH_PASS: raise HTTPException(status_code=401) logger.info("✅ Fetch request received") client = MongoClient(MONGO_URI) data = list(client[MONGO_DB][MONGO_COL].find({}, {"_id": 0})) return JSONResponse(content=jsonable_encoder(data)) @app.get("/delete") def delete(Password: str): if Password != FETCH_PASS: raise HTTPException(status_code=401) client = MongoClient(MONGO_URI) count = client[MONGO_DB][MONGO_COL].delete_many({}).deleted_count logger.info("⚠️ Delete request received") return {"message": f"🧨 Deleted {count} rows from MongoDB."} @app.get("/load") def load(Password: str): if Password != FETCH_PASS: raise HTTPException(status_code=401) client = MongoClient(MONGO_URI) df = pd.DataFrame(list(client[MONGO_DB][MONGO_COL].find({}, {"_id": 0}))) if df.empty: raise HTTPException(status_code=404, detail="No data found.") logger.info("✅ Download request received") df.to_csv(EXPORT_CSV_PATH, index=False) return FileResponse(EXPORT_CSV_PATH, filename="poptech_cleaned_data.csv", media_type="text/csv") @app.get("/healthz") def health(): return {"status": "ok"} # ─────── BOOTSTRAP ─────── def mqtt_main(): client = mqtt.Client() client.username_pw_set(USERNAME, PASSWORD) client.on_connect = on_connect client.on_message = on_message client.connect(BROKER, PORT, 60) client.loop_forever() # Handle parallel threads if __name__ == "__main__": # Set signal handlers in main thread def handle_exit(sig, _): logger.info("🛑 Shutdown signal received") stop_event.set() for s in [signal.SIGINT, signal.SIGTERM]: signal.signal(s, handle_exit) # Handle data ingestion from MQTT broker, and backfiller threading.Thread(target=backfill_worker, daemon=True).start() # quét back-fill 10s/lần threading.Thread(target=mqtt_main, daemon=True).start() uvicorn.run(app, host="0.0.0.0", port=7860)