File size: 11,666 Bytes
60369a6
 
 
 
 
 
 
 
5b851f5
b8524f4
5b851f5
 
 
 
 
 
 
d5d7292
60369a6
 
a6ce345
60369a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b8524f4
60369a6
b8524f4
 
 
60369a6
b8524f4
5b851f5
b8524f4
5b851f5
b8524f4
5b851f5
60369a6
5b851f5
 
 
b8524f4
 
 
 
 
5b851f5
b8524f4
 
 
 
 
5b851f5
b8524f4
 
9b7bb28
b8524f4
 
 
 
 
 
23f62fe
b8524f4
 
 
 
 
 
 
 
 
 
 
 
 
60369a6
b8524f4
 
 
 
 
 
9b7bb28
b8524f4
 
 
 
 
f105ff3
9b7bb28
b8524f4
5b851f5
b8524f4
 
d2ffc90
 
a6ce345
5b851f5
a6ce345
b8524f4
5b851f5
b8524f4
 
60369a6
5b851f5
b8524f4
60369a6
 
 
f44847d
b8524f4
f44847d
5b851f5
f44847d
 
b8524f4
f44847d
60369a6
 
b8524f4
 
 
 
 
 
60369a6
f44847d
60369a6
f105ff3
b8524f4
d2ffc90
b8524f4
f44847d
 
b8524f4
 
 
 
 
d2ffc90
f44847d
 
b8524f4
f44847d
b8524f4
f44847d
 
d2ffc90
f105ff3
b8524f4
60369a6
b8524f4
 
 
60369a6
 
b8524f4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5b851f5
b8524f4
 
5b851f5
b8524f4
 
f105ff3
 
b8524f4
 
 
 
 
 
 
 
 
5b851f5
b8524f4
 
 
 
 
 
 
 
 
5b851f5
60369a6
 
 
 
 
f9017b9
60369a6
 
a6ce345
60369a6
 
 
 
 
 
 
f9017b9
60369a6
 
 
 
 
 
 
 
 
 
f9017b9
60369a6
 
5b851f5
60369a6
 
 
5b851f5
60369a6
 
5b851f5
 
 
 
 
 
 
0a6246a
5b851f5
4216415
 
 
 
 
 
b8524f4
 
4216415
60369a6
4216415
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# app.py
# Root API: https://binkhoale1812-poptech-cleaner.hf.space/
# Usages: 
## https://binkhoale1812-poptech-cleaner.hf.space/fetch?Password=...
## https://binkhoale1812-poptech-cleaner.hf.space/load?Password=...
## https://binkhoale1812-poptech-cleaner.hf.space/delete?Password=...

import os, json, signal, logging, threading, time
from datetime import datetime, timedelta
from collections import deque 

import paho.mqtt.client as mqtt
from dotenv import load_dotenv
import pandas as pd
import numpy as np
from sklearn.impute import KNNImputer
from sklearn.linear_model import LinearRegression
from pymongo import MongoClient, UpdateOne
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse, FileResponse
from fastapi.encoders import jsonable_encoder
import uvicorn

# ─────── ENV CONFIG ───────
load_dotenv()

BROKER       = os.getenv("BROKER")
PORT         = int(os.getenv("PORT", 1883))
USERNAME     = os.getenv("USERNAME")
PASSWORD     = os.getenv("PASSWORD")
MQTT_TOPIC   = os.getenv("MQTT_TOPIC", "device/socket/reply/#")

MONGO_URI    = os.getenv("MONGO_URI")
MONGO_DB     = os.getenv("MONGO_DB", "poptech")
MONGO_COL    = os.getenv("MONGO_COLLECTION", "device_clean")
FETCH_PASS   = os.getenv("FETCH_PASSWORD")

# Tham số xử lý (thời gian)
EXPECTED_INTERVAL_SEC = int(os.getenv("EXPECTED_INTERVAL_SEC", 30))
TOLERANCE_SEC         = int(os.getenv("TOLERANCE_SEC", 10))
BUFFER_SECONDS        = int(os.getenv("BUFFER_SECONDS", 4 * 3600))   # 4 giờ
BACKFILL_INTERVAL     = int(os.getenv("BACKFILL_INTERVAL", 10))      # 10 giây

EXPORT_CSV_PATH       = "mongo_cleaned_export.csv"

# ─────────────── LOGGING ───────────────
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s — %(name)s — %(levelname)s — %(message)s",
    force=True
)
logger = logging.getLogger("poptech-cleaner")

# ─────────────── GLOBALS ───────────────
win_len      = BUFFER_SECONDS // EXPECTED_INTERVAL_SEC + 200
window       = deque(maxlen=win_len)           # lưu 4 giờ gần nhất
stop_event   = threading.Event()
app          = FastAPI()

# ─────────────── UTILITIES ───────────────
# Đảm bảo giá trị là float, nếu không flag NaN
def safe_float(x):
    try: return float(x)
    except: return np.nan

def parse_row(ts: str, topic: str, payload: str):
    """Trả về dict đã parse hoặc None nếu không hợp lệ."""
    try:
        j = json.loads(payload.replace('""', '"'))
        if not topic.startswith("device/socket/reply/"):
            return None
        if not isinstance(j.get("data", []), list):
            return None
        v, a, w, c = (j["data"] + [None] * 4)[:4]
        logger.info(f"📩 MQTT: {ts} | V={v}V, A={a}A, W={w}W, mWh={c}")
        # bỏ frame idle (all 0)
        if all(x in (0, None) for x in (a, w, c)):
            return None
        return {
            "timestamp": ts,
            "id": j.get("id"),
            "imei": j.get("imei"),
            "type": j.get("type"),
            "voltage": safe_float(v),
            "current": safe_float(a),
            "power": safe_float(w),
            "consume": safe_float(c)
        }
    except Exception:
        return None

# Tải dữ liệu mới lên DB
def upsert_mongo(docs):
    if not docs:
        return
    try:
        client = MongoClient(MONGO_URI)
        col    = client[MONGO_DB][MONGO_COL]
        col.create_index("timestamp", unique=True)
        ops = [UpdateOne({"_id": d["timestamp"]}, {"$set": d}, upsert=True) for d in docs]
        col.bulk_write(ops, ordered=False)
        logger.info(f"🪣 Inserted to MongoDB.")
    except Exception as e:
        logger.error(f"❌ Mongo error: {e}")

# Chèn giá trị tổng thể
def fill_missing(df: pd.DataFrame) -> pd.DataFrame:
    if df.empty:
        return df
    df = df.copy()
    df["timestamp"] = pd.to_datetime(df["timestamp"])
    df = df.sort_values("timestamp").reset_index(drop=True)
    # Tổng thời gian dự kiến giữa session
    expected = timedelta(seconds=EXPECTED_INTERVAL_SEC)
    tol      = timedelta(seconds=TOLERANCE_SEC)
    # Lọc lỗi và trống
    rows = [df.iloc[0]]
    for i in range(1, len(df)):
        prev, curr = df.iloc[i-1]["timestamp"], df.iloc[i]["timestamp"]
        rows.append(df.iloc[i])
        if curr - prev > expected + tol:
            for j in range(1, int(round((curr - prev) / expected))):
                gap_ts = prev + j * expected
                gap = df.iloc[i-1].copy()
                gap["timestamp"] = gap_ts
                for col in ["voltage", "current", "power", "consume"]:
                    gap[col] = np.nan
                rows.insert(-1, gap)
    # Sort với ts là identifier
    df = pd.DataFrame(rows).sort_values("timestamp").reset_index(drop=True)
    df["consume_clean"] = df["consume"]
    df.loc[(df["consume"] < 0) | (df["consume"].diff() < 0), "consume_clean"] = np.nan
    # Impute 3 giá trị còn lại với KNNImputer
    non_missing = df[["voltage","current","power"]].dropna().shape[0]
    k = min(3, max(1, non_missing))
    imputer = KNNImputer(n_neighbors=k)
    df[["voltage", "current", "power"]] = imputer.fit_transform(df[["voltage", "current", "power"]])
    # Train và pred fit với LinearRegression
    train = df[df["consume_clean"].notna()]
    pred  = df[df["consume_clean"].isna()]
    if not train.empty and not pred.empty:
        logger.info("Start cleaning 🧹")
        model = LinearRegression().fit(train[["voltage","current","power"]], train["consume_clean"])
        try:
            y_hat = model.predict(pred[["voltage","current","power"]])
            df.loc[pred.index, "consume_clean"] = pd.Series(y_hat, index=pred.index)
        except Exception as e:
            logger.warning(f"⚠️ Primary model error: {e}")
    # Nếu còn giá trị trống sau bộ lọc đầu, tái sd LinearRegression và dự đoán trên ts + tổng tg giữa session
    still = df[df["consume_clean"].isna()]
    if not still.empty:
        logger.warning(f"⚠️ {len(still)} rows still missing → timestamp fallback")
        df["ts_sec"] = (df["timestamp"] - df["timestamp"].min()).dt.total_seconds()
        fb_train = df[df["consume_clean"].notna()]
        fb_pred  = df[df["consume_clean"].isna()]
        fb_pred  = fb_pred[fb_pred["ts_sec"].notna()].drop_duplicates(subset="timestamp")
        if not fb_train.empty and not fb_pred.empty:
            fb_model = LinearRegression().fit(fb_train[["ts_sec"]], fb_train["consume_clean"])
            y_fb = fb_model.predict(fb_pred[["ts_sec"]])
            df.loc[fb_pred.index, "consume_clean"] = pd.Series(y_fb, index=fb_pred.index)
        df.drop(columns=["ts_sec"], inplace=True)
        logger.info("Finish cleaning 🧹")
    # Giá trị cuối và thải giá trị thừa
    df["consume"] = df["consume_clean"]
    # Đánh dấu những bản ghi vẫn còn thiếu consume
    # Khi hàm trả về, mỗi dòng sẽ có need_backfill = True/False.
    df.loc[:, "need_backfill"] = df["consume"].isna()
    return df.drop(columns=["consume_clean"])

# ───────────── MQTT CALLBACKS ─────────────
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        logger.info("✅ MQTT connected")
        client.subscribe(MQTT_TOPIC)
    else:
        logger.error(f"❌ MQTT connect failed: {rc}")

# Pipe chính và debug
def on_message(client, userdata, msg):
    ts = datetime.utcnow().isoformat()
    payload = msg.payload.decode(errors="replace")
    row = parse_row(ts,msg.topic,payload)
    if row is None: return
    # Ghép vào cửa sổ và fill ngay
    df_win = pd.DataFrame(window)
    df_new = pd.concat([df_win, pd.DataFrame([row])], ignore_index=True)
    df_filled = fill_missing(df_new.tail(2))  # chỉ cần bản ghi trước & mới
    row_clean = df_filled.tail(1).to_dict("records")[0]
    row_clean["need_backfill"] = pd.isna(row_clean["consume"])
    # Gắn giá trị clean vào window session
    window.append(row_clean)
    upsert_mongo([row_clean])
    logger.info(f"📥 Stored row {row_clean['timestamp']}")

# ───────────── BACK-FILL WORKER ─────────────
def backfill_worker():
    while not stop_event.is_set():
        time.sleep(BACKFILL_INTERVAL)
        df_win = pd.DataFrame(window)
        if "need_backfill" not in df_win.columns:
            continue  # chưa có dữ liệu đã qua fill_missing()
        pending_mask = df_win["need_backfill"]
        if not pending_mask.any():
            continue
        idxs = df_win[pending_mask].index
        cols = ["voltage", "current", "power"]
        imputer = KNNImputer(n_neighbors=3)
        df_win[cols] = imputer.fit_transform(df_win[cols])
        train = df_win[~pending_mask]
        if train.empty:
            continue
        model = LinearRegression().fit(train[cols], train["consume"])
        df_win.loc[idxs, "consume"] = model.predict(df_win.loc[idxs, cols])
        df_win.loc[idxs, "need_backfill"] = False
        # update deque
        for i in idxs:
            window[i].update(df_win.loc[i].to_dict())
        # Upload and merge current on Mongo
        upsert_mongo([window[i] for i in idxs])
        logger.info(f"🔄 Back-filled {len(idxs)} rows")

# ─────── FASTAPI ENDPOINTS ───────
@app.get("/fetch")
def fetch(Password: str):
    if Password != FETCH_PASS:
        raise HTTPException(status_code=401)
    logger.info("✅ Fetch request received")
    client = MongoClient(MONGO_URI)
    data = list(client[MONGO_DB][MONGO_COL].find({}, {"_id": 0}))
    return JSONResponse(content=jsonable_encoder(data))

@app.get("/delete")
def delete(Password: str):
    if Password != FETCH_PASS:
        raise HTTPException(status_code=401)
    client = MongoClient(MONGO_URI)
    count = client[MONGO_DB][MONGO_COL].delete_many({}).deleted_count
    logger.info("⚠️ Delete request received")
    return {"message": f"🧨 Deleted {count} rows from MongoDB."}

@app.get("/load")
def load(Password: str):
    if Password != FETCH_PASS:
        raise HTTPException(status_code=401)
    client = MongoClient(MONGO_URI)
    df = pd.DataFrame(list(client[MONGO_DB][MONGO_COL].find({}, {"_id": 0})))
    if df.empty:
        raise HTTPException(status_code=404, detail="No data found.")
    logger.info("✅ Download request received")
    df.to_csv(EXPORT_CSV_PATH, index=False)
    return FileResponse(EXPORT_CSV_PATH, filename="poptech_cleaned_data.csv", media_type="text/csv")

@app.get("/healthz")
def health():
    return {"status": "ok"}

# ─────── BOOTSTRAP ───────
def mqtt_main():
    client = mqtt.Client()
    client.username_pw_set(USERNAME, PASSWORD)
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect(BROKER, PORT, 60)
    client.loop_forever()

# Handle parallel threads
if __name__ == "__main__":
    # Set signal handlers in main thread
    def handle_exit(sig, _):
        logger.info("🛑 Shutdown signal received")
        stop_event.set()
    for s in [signal.SIGINT, signal.SIGTERM]:
        signal.signal(s, handle_exit)
    # Handle data ingestion from MQTT broker, and backfiller
    threading.Thread(target=backfill_worker, daemon=True).start()   # quét back-fill 10s/lần
    threading.Thread(target=mqtt_main, daemon=True).start()
    uvicorn.run(app, host="0.0.0.0", port=7860)