Spaces:
Sleeping
Sleeping
File size: 11,666 Bytes
60369a6 5b851f5 b8524f4 5b851f5 d5d7292 60369a6 a6ce345 60369a6 b8524f4 60369a6 b8524f4 60369a6 b8524f4 5b851f5 b8524f4 5b851f5 b8524f4 5b851f5 60369a6 5b851f5 b8524f4 5b851f5 b8524f4 5b851f5 b8524f4 9b7bb28 b8524f4 23f62fe b8524f4 60369a6 b8524f4 9b7bb28 b8524f4 f105ff3 9b7bb28 b8524f4 5b851f5 b8524f4 d2ffc90 a6ce345 5b851f5 a6ce345 b8524f4 5b851f5 b8524f4 60369a6 5b851f5 b8524f4 60369a6 f44847d b8524f4 f44847d 5b851f5 f44847d b8524f4 f44847d 60369a6 b8524f4 60369a6 f44847d 60369a6 f105ff3 b8524f4 d2ffc90 b8524f4 f44847d b8524f4 d2ffc90 f44847d b8524f4 f44847d b8524f4 f44847d d2ffc90 f105ff3 b8524f4 60369a6 b8524f4 60369a6 b8524f4 5b851f5 b8524f4 5b851f5 b8524f4 f105ff3 b8524f4 5b851f5 b8524f4 5b851f5 60369a6 f9017b9 60369a6 a6ce345 60369a6 f9017b9 60369a6 f9017b9 60369a6 5b851f5 60369a6 5b851f5 60369a6 5b851f5 0a6246a 5b851f5 4216415 b8524f4 4216415 60369a6 4216415 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
# app.py
# Root API: https://binkhoale1812-poptech-cleaner.hf.space/
# Usages:
## https://binkhoale1812-poptech-cleaner.hf.space/fetch?Password=...
## https://binkhoale1812-poptech-cleaner.hf.space/load?Password=...
## https://binkhoale1812-poptech-cleaner.hf.space/delete?Password=...
import os, json, signal, logging, threading, time
from datetime import datetime, timedelta
from collections import deque
import paho.mqtt.client as mqtt
from dotenv import load_dotenv
import pandas as pd
import numpy as np
from sklearn.impute import KNNImputer
from sklearn.linear_model import LinearRegression
from pymongo import MongoClient, UpdateOne
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse, FileResponse
from fastapi.encoders import jsonable_encoder
import uvicorn
# ─────── ENV CONFIG ───────
load_dotenv()
BROKER = os.getenv("BROKER")
PORT = int(os.getenv("PORT", 1883))
USERNAME = os.getenv("USERNAME")
PASSWORD = os.getenv("PASSWORD")
MQTT_TOPIC = os.getenv("MQTT_TOPIC", "device/socket/reply/#")
MONGO_URI = os.getenv("MONGO_URI")
MONGO_DB = os.getenv("MONGO_DB", "poptech")
MONGO_COL = os.getenv("MONGO_COLLECTION", "device_clean")
FETCH_PASS = os.getenv("FETCH_PASSWORD")
# Tham số xử lý (thời gian)
EXPECTED_INTERVAL_SEC = int(os.getenv("EXPECTED_INTERVAL_SEC", 30))
TOLERANCE_SEC = int(os.getenv("TOLERANCE_SEC", 10))
BUFFER_SECONDS = int(os.getenv("BUFFER_SECONDS", 4 * 3600)) # 4 giờ
BACKFILL_INTERVAL = int(os.getenv("BACKFILL_INTERVAL", 10)) # 10 giây
EXPORT_CSV_PATH = "mongo_cleaned_export.csv"
# ─────────────── LOGGING ───────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s — %(name)s — %(levelname)s — %(message)s",
force=True
)
logger = logging.getLogger("poptech-cleaner")
# ─────────────── GLOBALS ───────────────
win_len = BUFFER_SECONDS // EXPECTED_INTERVAL_SEC + 200
window = deque(maxlen=win_len) # lưu 4 giờ gần nhất
stop_event = threading.Event()
app = FastAPI()
# ─────────────── UTILITIES ───────────────
# Đảm bảo giá trị là float, nếu không flag NaN
def safe_float(x):
try: return float(x)
except: return np.nan
def parse_row(ts: str, topic: str, payload: str):
"""Trả về dict đã parse hoặc None nếu không hợp lệ."""
try:
j = json.loads(payload.replace('""', '"'))
if not topic.startswith("device/socket/reply/"):
return None
if not isinstance(j.get("data", []), list):
return None
v, a, w, c = (j["data"] + [None] * 4)[:4]
logger.info(f"📩 MQTT: {ts} | V={v}V, A={a}A, W={w}W, mWh={c}")
# bỏ frame idle (all 0)
if all(x in (0, None) for x in (a, w, c)):
return None
return {
"timestamp": ts,
"id": j.get("id"),
"imei": j.get("imei"),
"type": j.get("type"),
"voltage": safe_float(v),
"current": safe_float(a),
"power": safe_float(w),
"consume": safe_float(c)
}
except Exception:
return None
# Tải dữ liệu mới lên DB
def upsert_mongo(docs):
if not docs:
return
try:
client = MongoClient(MONGO_URI)
col = client[MONGO_DB][MONGO_COL]
col.create_index("timestamp", unique=True)
ops = [UpdateOne({"_id": d["timestamp"]}, {"$set": d}, upsert=True) for d in docs]
col.bulk_write(ops, ordered=False)
logger.info(f"🪣 Inserted to MongoDB.")
except Exception as e:
logger.error(f"❌ Mongo error: {e}")
# Chèn giá trị tổng thể
def fill_missing(df: pd.DataFrame) -> pd.DataFrame:
if df.empty:
return df
df = df.copy()
df["timestamp"] = pd.to_datetime(df["timestamp"])
df = df.sort_values("timestamp").reset_index(drop=True)
# Tổng thời gian dự kiến giữa session
expected = timedelta(seconds=EXPECTED_INTERVAL_SEC)
tol = timedelta(seconds=TOLERANCE_SEC)
# Lọc lỗi và trống
rows = [df.iloc[0]]
for i in range(1, len(df)):
prev, curr = df.iloc[i-1]["timestamp"], df.iloc[i]["timestamp"]
rows.append(df.iloc[i])
if curr - prev > expected + tol:
for j in range(1, int(round((curr - prev) / expected))):
gap_ts = prev + j * expected
gap = df.iloc[i-1].copy()
gap["timestamp"] = gap_ts
for col in ["voltage", "current", "power", "consume"]:
gap[col] = np.nan
rows.insert(-1, gap)
# Sort với ts là identifier
df = pd.DataFrame(rows).sort_values("timestamp").reset_index(drop=True)
df["consume_clean"] = df["consume"]
df.loc[(df["consume"] < 0) | (df["consume"].diff() < 0), "consume_clean"] = np.nan
# Impute 3 giá trị còn lại với KNNImputer
non_missing = df[["voltage","current","power"]].dropna().shape[0]
k = min(3, max(1, non_missing))
imputer = KNNImputer(n_neighbors=k)
df[["voltage", "current", "power"]] = imputer.fit_transform(df[["voltage", "current", "power"]])
# Train và pred fit với LinearRegression
train = df[df["consume_clean"].notna()]
pred = df[df["consume_clean"].isna()]
if not train.empty and not pred.empty:
logger.info("Start cleaning 🧹")
model = LinearRegression().fit(train[["voltage","current","power"]], train["consume_clean"])
try:
y_hat = model.predict(pred[["voltage","current","power"]])
df.loc[pred.index, "consume_clean"] = pd.Series(y_hat, index=pred.index)
except Exception as e:
logger.warning(f"⚠️ Primary model error: {e}")
# Nếu còn giá trị trống sau bộ lọc đầu, tái sd LinearRegression và dự đoán trên ts + tổng tg giữa session
still = df[df["consume_clean"].isna()]
if not still.empty:
logger.warning(f"⚠️ {len(still)} rows still missing → timestamp fallback")
df["ts_sec"] = (df["timestamp"] - df["timestamp"].min()).dt.total_seconds()
fb_train = df[df["consume_clean"].notna()]
fb_pred = df[df["consume_clean"].isna()]
fb_pred = fb_pred[fb_pred["ts_sec"].notna()].drop_duplicates(subset="timestamp")
if not fb_train.empty and not fb_pred.empty:
fb_model = LinearRegression().fit(fb_train[["ts_sec"]], fb_train["consume_clean"])
y_fb = fb_model.predict(fb_pred[["ts_sec"]])
df.loc[fb_pred.index, "consume_clean"] = pd.Series(y_fb, index=fb_pred.index)
df.drop(columns=["ts_sec"], inplace=True)
logger.info("Finish cleaning 🧹")
# Giá trị cuối và thải giá trị thừa
df["consume"] = df["consume_clean"]
# Đánh dấu những bản ghi vẫn còn thiếu consume
# Khi hàm trả về, mỗi dòng sẽ có need_backfill = True/False.
df.loc[:, "need_backfill"] = df["consume"].isna()
return df.drop(columns=["consume_clean"])
# ───────────── MQTT CALLBACKS ─────────────
def on_connect(client, userdata, flags, rc):
if rc == 0:
logger.info("✅ MQTT connected")
client.subscribe(MQTT_TOPIC)
else:
logger.error(f"❌ MQTT connect failed: {rc}")
# Pipe chính và debug
def on_message(client, userdata, msg):
ts = datetime.utcnow().isoformat()
payload = msg.payload.decode(errors="replace")
row = parse_row(ts,msg.topic,payload)
if row is None: return
# Ghép vào cửa sổ và fill ngay
df_win = pd.DataFrame(window)
df_new = pd.concat([df_win, pd.DataFrame([row])], ignore_index=True)
df_filled = fill_missing(df_new.tail(2)) # chỉ cần bản ghi trước & mới
row_clean = df_filled.tail(1).to_dict("records")[0]
row_clean["need_backfill"] = pd.isna(row_clean["consume"])
# Gắn giá trị clean vào window session
window.append(row_clean)
upsert_mongo([row_clean])
logger.info(f"📥 Stored row {row_clean['timestamp']}")
# ───────────── BACK-FILL WORKER ─────────────
def backfill_worker():
while not stop_event.is_set():
time.sleep(BACKFILL_INTERVAL)
df_win = pd.DataFrame(window)
if "need_backfill" not in df_win.columns:
continue # chưa có dữ liệu đã qua fill_missing()
pending_mask = df_win["need_backfill"]
if not pending_mask.any():
continue
idxs = df_win[pending_mask].index
cols = ["voltage", "current", "power"]
imputer = KNNImputer(n_neighbors=3)
df_win[cols] = imputer.fit_transform(df_win[cols])
train = df_win[~pending_mask]
if train.empty:
continue
model = LinearRegression().fit(train[cols], train["consume"])
df_win.loc[idxs, "consume"] = model.predict(df_win.loc[idxs, cols])
df_win.loc[idxs, "need_backfill"] = False
# update deque
for i in idxs:
window[i].update(df_win.loc[i].to_dict())
# Upload and merge current on Mongo
upsert_mongo([window[i] for i in idxs])
logger.info(f"🔄 Back-filled {len(idxs)} rows")
# ─────── FASTAPI ENDPOINTS ───────
@app.get("/fetch")
def fetch(Password: str):
if Password != FETCH_PASS:
raise HTTPException(status_code=401)
logger.info("✅ Fetch request received")
client = MongoClient(MONGO_URI)
data = list(client[MONGO_DB][MONGO_COL].find({}, {"_id": 0}))
return JSONResponse(content=jsonable_encoder(data))
@app.get("/delete")
def delete(Password: str):
if Password != FETCH_PASS:
raise HTTPException(status_code=401)
client = MongoClient(MONGO_URI)
count = client[MONGO_DB][MONGO_COL].delete_many({}).deleted_count
logger.info("⚠️ Delete request received")
return {"message": f"🧨 Deleted {count} rows from MongoDB."}
@app.get("/load")
def load(Password: str):
if Password != FETCH_PASS:
raise HTTPException(status_code=401)
client = MongoClient(MONGO_URI)
df = pd.DataFrame(list(client[MONGO_DB][MONGO_COL].find({}, {"_id": 0})))
if df.empty:
raise HTTPException(status_code=404, detail="No data found.")
logger.info("✅ Download request received")
df.to_csv(EXPORT_CSV_PATH, index=False)
return FileResponse(EXPORT_CSV_PATH, filename="poptech_cleaned_data.csv", media_type="text/csv")
@app.get("/healthz")
def health():
return {"status": "ok"}
# ─────── BOOTSTRAP ───────
def mqtt_main():
client = mqtt.Client()
client.username_pw_set(USERNAME, PASSWORD)
client.on_connect = on_connect
client.on_message = on_message
client.connect(BROKER, PORT, 60)
client.loop_forever()
# Handle parallel threads
if __name__ == "__main__":
# Set signal handlers in main thread
def handle_exit(sig, _):
logger.info("🛑 Shutdown signal received")
stop_event.set()
for s in [signal.SIGINT, signal.SIGTERM]:
signal.signal(s, handle_exit)
# Handle data ingestion from MQTT broker, and backfiller
threading.Thread(target=backfill_worker, daemon=True).start() # quét back-fill 10s/lần
threading.Thread(target=mqtt_main, daemon=True).start()
uvicorn.run(app, host="0.0.0.0", port=7860)
|