Spaces:
Sleeping
Sleeping
Commit
·
f105ff3
1
Parent(s):
b8524f4
Upd logger
Browse files
app.py
CHANGED
|
@@ -99,6 +99,7 @@ def upsert_mongo(docs):
|
|
| 99 |
col.create_index("timestamp", unique=True)
|
| 100 |
ops = [UpdateOne({"_id": d["timestamp"]}, {"$set": d}, upsert=True) for d in docs]
|
| 101 |
col.bulk_write(ops, ordered=False)
|
|
|
|
| 102 |
except Exception as e:
|
| 103 |
logger.error(f"❌ Mongo error: {e}")
|
| 104 |
|
|
@@ -137,6 +138,7 @@ def fill_missing(df: pd.DataFrame) -> pd.DataFrame:
|
|
| 137 |
train = df[df["consume_clean"].notna()]
|
| 138 |
pred = df[df["consume_clean"].isna()]
|
| 139 |
if not train.empty and not pred.empty:
|
|
|
|
| 140 |
model = LinearRegression().fit(train[["voltage","current","power"]], train["consume_clean"])
|
| 141 |
try:
|
| 142 |
y_hat = model.predict(pred[["voltage","current","power"]])
|
|
@@ -156,6 +158,7 @@ def fill_missing(df: pd.DataFrame) -> pd.DataFrame:
|
|
| 156 |
y_fb = fb_model.predict(fb_pred[["ts_sec"]])
|
| 157 |
df.loc[fb_pred.index, "consume_clean"] = pd.Series(y_fb, index=fb_pred.index)
|
| 158 |
df.drop(columns=["ts_sec"], inplace=True)
|
|
|
|
| 159 |
# Giá trị cuối và thải giá trị thừa
|
| 160 |
df["consume"] = df["consume_clean"]
|
| 161 |
# Đánh dấu những bản ghi vẫn còn thiếu consume
|
|
@@ -195,6 +198,8 @@ def backfill_worker():
|
|
| 195 |
while not stop_event.is_set():
|
| 196 |
time.sleep(BACKFILL_INTERVAL)
|
| 197 |
df_win = pd.DataFrame(window)
|
|
|
|
|
|
|
| 198 |
pending_mask = df_win["need_backfill"]
|
| 199 |
if not pending_mask.any():
|
| 200 |
continue
|
|
|
|
| 99 |
col.create_index("timestamp", unique=True)
|
| 100 |
ops = [UpdateOne({"_id": d["timestamp"]}, {"$set": d}, upsert=True) for d in docs]
|
| 101 |
col.bulk_write(ops, ordered=False)
|
| 102 |
+
logger.info(f"🪣 Inserted to MongoDB.")
|
| 103 |
except Exception as e:
|
| 104 |
logger.error(f"❌ Mongo error: {e}")
|
| 105 |
|
|
|
|
| 138 |
train = df[df["consume_clean"].notna()]
|
| 139 |
pred = df[df["consume_clean"].isna()]
|
| 140 |
if not train.empty and not pred.empty:
|
| 141 |
+
logger.info("Start cleaning 🧹")
|
| 142 |
model = LinearRegression().fit(train[["voltage","current","power"]], train["consume_clean"])
|
| 143 |
try:
|
| 144 |
y_hat = model.predict(pred[["voltage","current","power"]])
|
|
|
|
| 158 |
y_fb = fb_model.predict(fb_pred[["ts_sec"]])
|
| 159 |
df.loc[fb_pred.index, "consume_clean"] = pd.Series(y_fb, index=fb_pred.index)
|
| 160 |
df.drop(columns=["ts_sec"], inplace=True)
|
| 161 |
+
logger.info("Finish cleaning 🧹")
|
| 162 |
# Giá trị cuối và thải giá trị thừa
|
| 163 |
df["consume"] = df["consume_clean"]
|
| 164 |
# Đánh dấu những bản ghi vẫn còn thiếu consume
|
|
|
|
| 198 |
while not stop_event.is_set():
|
| 199 |
time.sleep(BACKFILL_INTERVAL)
|
| 200 |
df_win = pd.DataFrame(window)
|
| 201 |
+
if "need_backfill" not in df_win.columns:
|
| 202 |
+
continue # chưa có dữ liệu đã qua fill_missing()
|
| 203 |
pending_mask = df_win["need_backfill"]
|
| 204 |
if not pending_mask.any():
|
| 205 |
continue
|