Spaces:
Sleeping
Sleeping
Commit
·
d2ffc90
1
Parent(s):
0a6246a
Update fillmissing with ts fallback on failed 3 fields pred
Browse files
app.py
CHANGED
|
@@ -110,39 +110,67 @@ def parse_and_filter(raw_rows):
|
|
| 110 |
|
| 111 |
## Detect and fill missing
|
| 112 |
def fill_missing(df):
|
| 113 |
-
if df.empty:
|
|
|
|
|
|
|
| 114 |
df["timestamp"] = pd.to_datetime(df["timestamp"])
|
| 115 |
df.sort_values("timestamp", inplace=True)
|
|
|
|
| 116 |
expected = timedelta(seconds=EXPECTED_INTERVAL_SEC)
|
| 117 |
tol = timedelta(seconds=TOLERANCE_SEC)
|
|
|
|
| 118 |
rows = [df.iloc[0]]
|
| 119 |
for i in range(1, len(df)):
|
| 120 |
-
prev, curr = df.iloc[i-1]["timestamp"], df.iloc[i]["timestamp"]
|
| 121 |
rows.append(df.iloc[i])
|
| 122 |
if curr - prev > expected + tol:
|
| 123 |
for j in range(1, int(round((curr - prev) / expected))):
|
| 124 |
new_ts = prev + j * expected
|
| 125 |
-
gap_row = df.iloc[i-1].copy()
|
| 126 |
gap_row["timestamp"] = new_ts
|
| 127 |
for col in ["voltage", "current", "power", "consume"]:
|
| 128 |
gap_row[col] = np.nan
|
| 129 |
rows.insert(-1, gap_row)
|
|
|
|
| 130 |
df = pd.DataFrame(rows).sort_values("timestamp")
|
| 131 |
df["consume_clean"] = df["consume"]
|
|
|
|
| 132 |
df.loc[(df["consume"] < 0) | (df["consume"].diff() < 0), "consume_clean"] = np.nan
|
| 133 |
-
#
|
| 134 |
imputer = KNNImputer(n_neighbors=3)
|
| 135 |
df[["voltage", "current", "power"]] = imputer.fit_transform(df[["voltage", "current", "power"]])
|
| 136 |
-
#
|
| 137 |
train = df[df["consume_clean"].notna()]
|
| 138 |
pred = df[df["consume_clean"].isna()]
|
| 139 |
if not train.empty and not pred.empty:
|
| 140 |
-
model = LinearRegression()
|
| 141 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 142 |
df["consume"] = df["consume_clean"]
|
| 143 |
logger.info("🧹 Handle missing function proceed")
|
| 144 |
return df.drop(columns=["consume_clean"])
|
| 145 |
|
|
|
|
| 146 |
## MongoDB insertion
|
| 147 |
def insert_mongo(df):
|
| 148 |
if df.empty: return
|
|
|
|
| 110 |
|
| 111 |
## Detect and fill missing
|
| 112 |
def fill_missing(df):
|
| 113 |
+
if df.empty:
|
| 114 |
+
return df
|
| 115 |
+
# Normalise values
|
| 116 |
df["timestamp"] = pd.to_datetime(df["timestamp"])
|
| 117 |
df.sort_values("timestamp", inplace=True)
|
| 118 |
+
# Allowance
|
| 119 |
expected = timedelta(seconds=EXPECTED_INTERVAL_SEC)
|
| 120 |
tol = timedelta(seconds=TOLERANCE_SEC)
|
| 121 |
+
# B1: phát hiện và chèn các dòng bị thiếu timestamp
|
| 122 |
rows = [df.iloc[0]]
|
| 123 |
for i in range(1, len(df)):
|
| 124 |
+
prev, curr = df.iloc[i - 1]["timestamp"], df.iloc[i]["timestamp"]
|
| 125 |
rows.append(df.iloc[i])
|
| 126 |
if curr - prev > expected + tol:
|
| 127 |
for j in range(1, int(round((curr - prev) / expected))):
|
| 128 |
new_ts = prev + j * expected
|
| 129 |
+
gap_row = df.iloc[i - 1].copy()
|
| 130 |
gap_row["timestamp"] = new_ts
|
| 131 |
for col in ["voltage", "current", "power", "consume"]:
|
| 132 |
gap_row[col] = np.nan
|
| 133 |
rows.insert(-1, gap_row)
|
| 134 |
+
# Flag and clean
|
| 135 |
df = pd.DataFrame(rows).sort_values("timestamp")
|
| 136 |
df["consume_clean"] = df["consume"]
|
| 137 |
+
# B2: loại bỏ giá trị bất thường
|
| 138 |
df.loc[(df["consume"] < 0) | (df["consume"].diff() < 0), "consume_clean"] = np.nan
|
| 139 |
+
# B3: nội suy input features bằng KNN
|
| 140 |
imputer = KNNImputer(n_neighbors=3)
|
| 141 |
df[["voltage", "current", "power"]] = imputer.fit_transform(df[["voltage", "current", "power"]])
|
| 142 |
+
# B4: mô hình chính sử dụng 3 input đầu vào
|
| 143 |
train = df[df["consume_clean"].notna()]
|
| 144 |
pred = df[df["consume_clean"].isna()]
|
| 145 |
if not train.empty and not pred.empty:
|
| 146 |
+
model = LinearRegression()
|
| 147 |
+
model.fit(train[["voltage", "current", "power"]], train["consume_clean"])
|
| 148 |
+
try:
|
| 149 |
+
df.loc[pred.index, "consume_clean"] = model.predict(pred[["voltage", "current", "power"]])
|
| 150 |
+
except ValueError as e:
|
| 151 |
+
logger.warning(f"⚠️ LinearRegression prediction failed on part of data: {e}")
|
| 152 |
+
# B5: fallback dự đoán theo timestamp nếu vẫn còn thiếu
|
| 153 |
+
still_missing = df[df["consume_clean"].isna()]
|
| 154 |
+
if not still_missing.empty:
|
| 155 |
+
logger.warning(f"⚠️ {len(still_missing)} rows still missing consume after model prediction. Using timestamp fallback.")
|
| 156 |
+
# Total time (ts_sec)
|
| 157 |
+
df["ts_sec"] = (df["timestamp"] - df["timestamp"].min()).dt.total_seconds()
|
| 158 |
+
# Normalize
|
| 159 |
+
fallback_train = df[df["consume_clean"].notna()]
|
| 160 |
+
fallback_pred = df[df["consume_clean"].isna()]
|
| 161 |
+
# Fallback
|
| 162 |
+
if not fallback_train.empty and not fallback_pred.empty:
|
| 163 |
+
fallback_model = LinearRegression()
|
| 164 |
+
fallback_model.fit(fallback_train[["ts_sec"]], fallback_train["consume_clean"])
|
| 165 |
+
df.loc[fallback_pred.index, "consume_clean"] = fallback_model.predict(fallback_pred[["ts_sec"]])
|
| 166 |
+
# Drop ts_sec
|
| 167 |
+
df.drop(columns=["ts_sec"], inplace=True)
|
| 168 |
+
# B6: cập nhật kết quả cuối cùng
|
| 169 |
df["consume"] = df["consume_clean"]
|
| 170 |
logger.info("🧹 Handle missing function proceed")
|
| 171 |
return df.drop(columns=["consume_clean"])
|
| 172 |
|
| 173 |
+
|
| 174 |
## MongoDB insertion
|
| 175 |
def insert_mongo(df):
|
| 176 |
if df.empty: return
|