LiamKhoaLe commited on
Commit
f44847d
·
1 Parent(s): e0fc7e4

Upd NaN handler with ts control v2

Browse files
Files changed (1) hide show
  1. app.py +43 -42
app.py CHANGED
@@ -112,71 +112,71 @@ def parse_and_filter(raw_rows):
112
  def fill_missing(df):
113
  if df.empty:
114
  return df
115
- # Normalise values
116
  df["timestamp"] = pd.to_datetime(df["timestamp"])
117
  df.sort_values("timestamp", inplace=True)
118
- # Allowance
119
  expected = timedelta(seconds=EXPECTED_INTERVAL_SEC)
120
  tol = timedelta(seconds=TOLERANCE_SEC)
121
- # B1: phát hiện chèn các dòng bị thiếu timestamp
122
  rows = [df.iloc[0]]
123
  for i in range(1, len(df)):
124
  prev, curr = df.iloc[i - 1]["timestamp"], df.iloc[i]["timestamp"]
125
  rows.append(df.iloc[i])
126
  if curr - prev > expected + tol:
127
  for j in range(1, int(round((curr - prev) / expected))):
128
- new_ts = prev + j * expected
129
- gap_row = df.iloc[i - 1].copy()
130
- gap_row["timestamp"] = new_ts
131
  for col in ["voltage", "current", "power", "consume"]:
132
- gap_row[col] = np.nan
133
- rows.insert(-1, gap_row)
134
- # Flag and clean
135
- df = pd.DataFrame(rows).sort_values("timestamp")
136
  df["consume_clean"] = df["consume"]
137
- # B2: loại bỏ giá trị bất thường
138
  df.loc[(df["consume"] < 0) | (df["consume"].diff() < 0), "consume_clean"] = np.nan
139
- # B3: nội suy input features bằng KNN
140
  imputer = KNNImputer(n_neighbors=3)
141
- df[["voltage", "current", "power"]] = imputer.fit_transform(df[["voltage", "current", "power"]])
142
- # B4: mô hình chính sử dụng 3 input đầu vào
 
 
143
  train = df[df["consume_clean"].notna()]
144
- pred = df[df["consume_clean"].isna()]
 
145
  if not train.empty and not pred.empty:
146
- model = LinearRegression()
147
- model.fit(train[["voltage", "current", "power"]], train["consume_clean"])
 
 
148
  try:
149
- df.loc[pred.index, "consume_clean"] = model.predict(pred[["voltage", "current", "power"]])
150
- except ValueError as e:
151
- logger.warning(f"⚠️ LinearRegression prediction failed on part of data: {e}")
152
- # B5: fallback dự đoán theo timestamp nếu vẫn còn thiếu
 
 
153
  still_missing = df[df["consume_clean"].isna()]
154
  if not still_missing.empty:
155
- logger.warning(f"⚠️ {len(still_missing)} rows still missing consume after model prediction. Using timestamp fallback.")
156
- # Total second computation (temp variable)
157
  df["ts_sec"] = (df["timestamp"] - df["timestamp"].min()).dt.total_seconds()
158
- # Can't be NaN to be trainable
159
- fallback_train = df[df["consume_clean"].notna()]
160
- fallback_pred = df[df["consume_clean"].isna()]
161
- # Rm NaN in timestamp fallback
162
- fallback_pred_valid = fallback_pred[fallback_pred["ts_sec"].notna()]
163
- # Not null -> render model
164
- if not fallback_train.empty and not fallback_pred_valid.empty:
165
- fallback_model = LinearRegression()
166
- fallback_model.fit(fallback_train[["ts_sec"]], fallback_train["consume_clean"])
167
- y_fallback_pred = fallback_model.predict(fallback_pred_valid[["ts_sec"]])
168
- if len(y_fallback_pred) == len(fallback_pred_valid):
169
- df.loc[fallback_pred_valid.index, "consume_clean"] = y_fallback_pred
170
- else:
171
- logger.warning("⚠️ Fallback prediction length mismatch, skipping assignment.")
172
- # Drop ts_sec (temp variable)
173
  df.drop(columns=["ts_sec"], inplace=True)
174
- # B6: cập nhật kết quả cuối cùng
175
  df["consume"] = df["consume_clean"]
176
- logger.info("🧹 Handle missing function proceed")
177
  return df.drop(columns=["consume_clean"])
178
 
179
-
180
  ## MongoDB insertion
181
  def insert_mongo(df):
182
  if df.empty: return
@@ -190,7 +190,7 @@ def insert_mongo(df):
190
  UpdateOne({"_id": r["_id"]}, {"$set": r}, upsert=True) for r in records
191
  ]
192
  col.bulk_write(operations, ordered=False)
193
- logger.info(f"📥 Inserted {len(records)} rows to MongoDB.")
194
  except Exception as e:
195
  logger.error(f"❌ Mongo insert error: {e}")
196
 
@@ -205,6 +205,7 @@ def batch_worker():
205
  if not bundle:
206
  logger.debug("⏱️ No new data this cycle")
207
  continue
 
208
  df_clean = fill_missing(parse_and_filter(bundle))
209
  insert_mongo(df_clean)
210
 
 
112
  def fill_missing(df):
113
  if df.empty:
114
  return df
115
+ # --- Chuẩn hoá thời gian ---
116
  df["timestamp"] = pd.to_datetime(df["timestamp"])
117
  df.sort_values("timestamp", inplace=True)
118
+ # Time interval limitation
119
  expected = timedelta(seconds=EXPECTED_INTERVAL_SEC)
120
  tol = timedelta(seconds=TOLERANCE_SEC)
121
+ # --- B1. Chèn bản ghi bị rơi ---
122
  rows = [df.iloc[0]]
123
  for i in range(1, len(df)):
124
  prev, curr = df.iloc[i - 1]["timestamp"], df.iloc[i]["timestamp"]
125
  rows.append(df.iloc[i])
126
  if curr - prev > expected + tol:
127
  for j in range(1, int(round((curr - prev) / expected))):
128
+ gap_ts = prev + j * expected
129
+ gap = df.iloc[i - 1].copy()
130
+ gap["timestamp"] = gap_ts
131
  for col in ["voltage", "current", "power", "consume"]:
132
+ gap[col] = np.nan
133
+ rows.insert(-1, gap)
134
+ # Sorting with ts to be identifier
135
+ df = pd.DataFrame(rows).sort_values("timestamp").reset_index(drop=True)
136
  df["consume_clean"] = df["consume"]
 
137
  df.loc[(df["consume"] < 0) | (df["consume"].diff() < 0), "consume_clean"] = np.nan
138
+ # --- B2. Impute feature ---
139
  imputer = KNNImputer(n_neighbors=3)
140
+ df[["voltage", "current", "power"]] = imputer.fit_transform(
141
+ df[["voltage", "current", "power"]]
142
+ )
143
+ # --- B3. Model chính ---
144
  train = df[df["consume_clean"].notna()]
145
+ pred = df[df["consume_clean"].isna()]
146
+ # NaN and null not valid
147
  if not train.empty and not pred.empty:
148
+ model = LinearRegression().fit(
149
+ train[["voltage", "current", "power"]],
150
+ train["consume_clean"]
151
+ )
152
  try:
153
+ y_hat = model.predict(pred[["voltage", "current", "power"]])
154
+ # Khớp index bằng Series (an toàn với duplicate)
155
+ df.loc[pred.index, "consume_clean"] = pd.Series(y_hat, index=pred.index)
156
+ except Exception as e:
157
+ logger.warning(f"⚠️ Primary model failed partially: {e}")
158
+ # --- B4. Fallback theo timestamp ---
159
  still_missing = df[df["consume_clean"].isna()]
160
  if not still_missing.empty:
161
+ logger.warning(f"⚠️ {len(still_missing)} rows still missing. Using timestamp fallback.")
 
162
  df["ts_sec"] = (df["timestamp"] - df["timestamp"].min()).dt.total_seconds()
163
+ fb_train = df[df["consume_clean"].notna()]
164
+ fb_pred = df[df["consume_clean"].isna()]
165
+ # Chỉ lấy bản ghi có ts_sec hợp lệ & index duy nhất
166
+ fb_pred = fb_pred[fb_pred["ts_sec"].notna()].drop_duplicates(subset="timestamp")
167
+ if not fb_train.empty and not fb_pred.empty:
168
+ fb_model = LinearRegression().fit(
169
+ fb_train[["ts_sec"]], fb_train["consume_clean"]
170
+ )
171
+ y_fb = fb_model.predict(fb_pred[["ts_sec"]])
172
+ df.loc[fb_pred.index, "consume_clean"] = pd.Series(y_fb, index=fb_pred.index)
173
+ # Drop total sec temp var
 
 
 
 
174
  df.drop(columns=["ts_sec"], inplace=True)
175
+ # --- Kết quả cuối ---
176
  df["consume"] = df["consume_clean"]
177
+ logger.info("🧹 fill_missing() hoàn tất làm sạch & khôi phục.")
178
  return df.drop(columns=["consume_clean"])
179
 
 
180
  ## MongoDB insertion
181
  def insert_mongo(df):
182
  if df.empty: return
 
190
  UpdateOne({"_id": r["_id"]}, {"$set": r}, upsert=True) for r in records
191
  ]
192
  col.bulk_write(operations, ordered=False)
193
+ logger.info(f"🪣 Inserted {len(records)} rows to MongoDB.")
194
  except Exception as e:
195
  logger.error(f"❌ Mongo insert error: {e}")
196
 
 
205
  if not bundle:
206
  logger.debug("⏱️ No new data this cycle")
207
  continue
208
+ logger.info("Start")
209
  df_clean = fill_missing(parse_and_filter(bundle))
210
  insert_mongo(df_clean)
211