Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
Commit
·
756e837
1
Parent(s):
ff54ca8
refactor: optimize database setup logging and limit sample sizes for clarity
Browse files
main.py
CHANGED
|
@@ -148,12 +148,10 @@ def setup_database():
|
|
| 148 |
sample_df = (
|
| 149 |
df.select(["datasetId", "last_modified"])
|
| 150 |
.sort("last_modified", descending=True)
|
| 151 |
-
.limit(
|
| 152 |
.collect()
|
| 153 |
)
|
| 154 |
-
logger.info("Sample of most recent incoming records:")
|
| 155 |
-
for row in sample_df.iter_rows():
|
| 156 |
-
logger.info(f" {row[0]}: {row[1]}")
|
| 157 |
|
| 158 |
if latest_update:
|
| 159 |
logger.info(f"Filtering records newer than {latest_update}")
|
|
@@ -183,13 +181,11 @@ def setup_database():
|
|
| 183 |
df.select(["datasetId", "last_modified"])
|
| 184 |
.filter(pl.col("last_modified") <= latest_update)
|
| 185 |
.sort("last_modified", descending=True)
|
| 186 |
-
.limit(
|
| 187 |
.collect()
|
| 188 |
)
|
| 189 |
if len(just_before) > 0:
|
| 190 |
-
logger.info("Records just before cutoff:")
|
| 191 |
-
for row in just_before.iter_rows():
|
| 192 |
-
logger.info(f" {row[0]}: {row[1]}")
|
| 193 |
|
| 194 |
df = df.collect()
|
| 195 |
total_rows = len(df)
|
|
@@ -210,20 +206,18 @@ def setup_database():
|
|
| 210 |
|
| 211 |
ids_to_upsert = batch_df.select(["datasetId"]).to_series().to_list()
|
| 212 |
|
| 213 |
-
# Log
|
| 214 |
-
|
|
|
|
| 215 |
|
| 216 |
-
# Check if any of these already exist
|
| 217 |
-
|
| 218 |
-
|
| 219 |
-
|
| 220 |
-
if existing_check["ids"]:
|
| 221 |
-
logger.info(
|
| 222 |
-
f"Found {len(existing_check['ids'])} existing records in this batch sample"
|
| 223 |
)
|
| 224 |
-
|
| 225 |
logger.info(
|
| 226 |
-
f"
|
| 227 |
)
|
| 228 |
|
| 229 |
dataset_collection.upsert(
|
|
@@ -293,27 +287,39 @@ def setup_database():
|
|
| 293 |
model_latest_update = max(model_last_modifieds)
|
| 294 |
logger.info(f"Most recent model record in DB from: {model_latest_update}")
|
| 295 |
|
| 296 |
-
#
|
| 297 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 298 |
if model_latest_update:
|
| 299 |
-
|
| 300 |
-
|
| 301 |
-
|
| 302 |
-
|
| 303 |
-
|
| 304 |
-
|
| 305 |
-
|
| 306 |
-
|
| 307 |
-
if "param_count" in schema:
|
| 308 |
-
logger.info("Found 'param_count' column in model data schema.")
|
| 309 |
-
select_columns.append("param_count")
|
| 310 |
-
else:
|
| 311 |
-
logger.warning(
|
| 312 |
-
"'param_count' column not found in model data schema. Will add it with null values."
|
| 313 |
-
)
|
| 314 |
|
| 315 |
-
|
| 316 |
-
model_df =
|
| 317 |
|
| 318 |
# If param_count was not in the original schema, add it now to the collected DataFrame
|
| 319 |
if "param_count" not in model_df.columns:
|
|
@@ -322,6 +328,7 @@ def setup_database():
|
|
| 322 |
)
|
| 323 |
|
| 324 |
total_rows = len(model_df)
|
|
|
|
| 325 |
|
| 326 |
for i in range(0, total_rows, BATCH_SIZE):
|
| 327 |
batch_df = model_df.slice(i, min(BATCH_SIZE, total_rows - i))
|
|
|
|
| 148 |
sample_df = (
|
| 149 |
df.select(["datasetId", "last_modified"])
|
| 150 |
.sort("last_modified", descending=True)
|
| 151 |
+
.limit(5)
|
| 152 |
.collect()
|
| 153 |
)
|
| 154 |
+
logger.info(f"Sample of most recent incoming records: {sample_df.rows()[:3]}")
|
|
|
|
|
|
|
| 155 |
|
| 156 |
if latest_update:
|
| 157 |
logger.info(f"Filtering records newer than {latest_update}")
|
|
|
|
| 181 |
df.select(["datasetId", "last_modified"])
|
| 182 |
.filter(pl.col("last_modified") <= latest_update)
|
| 183 |
.sort("last_modified", descending=True)
|
| 184 |
+
.limit(3)
|
| 185 |
.collect()
|
| 186 |
)
|
| 187 |
if len(just_before) > 0:
|
| 188 |
+
logger.info(f"Records just before cutoff: {just_before.rows()}")
|
|
|
|
|
|
|
| 189 |
|
| 190 |
df = df.collect()
|
| 191 |
total_rows = len(df)
|
|
|
|
| 206 |
|
| 207 |
ids_to_upsert = batch_df.select(["datasetId"]).to_series().to_list()
|
| 208 |
|
| 209 |
+
# Log progress for every batch
|
| 210 |
+
if i == 0 or (i // BATCH_SIZE + 1) % 5 == 0: # Log every 5th batch
|
| 211 |
+
logger.info(f"Upserting batch {i // BATCH_SIZE + 1} (sample IDs: {ids_to_upsert[:3]})")
|
| 212 |
|
| 213 |
+
# Check if any of these already exist (sample only)
|
| 214 |
+
if i == 0: # Only log for first batch to reduce noise
|
| 215 |
+
existing_check = dataset_collection.get(
|
| 216 |
+
ids=ids_to_upsert[:3], include=["metadatas"]
|
|
|
|
|
|
|
|
|
|
| 217 |
)
|
| 218 |
+
if existing_check["ids"]:
|
| 219 |
logger.info(
|
| 220 |
+
f"Sample: {len(existing_check['ids'])} existing records being updated"
|
| 221 |
)
|
| 222 |
|
| 223 |
dataset_collection.upsert(
|
|
|
|
| 287 |
model_latest_update = max(model_last_modifieds)
|
| 288 |
logger.info(f"Most recent model record in DB from: {model_latest_update}")
|
| 289 |
|
| 290 |
+
# Set up model schema columns
|
| 291 |
+
schema = model_lazy_df.collect_schema()
|
| 292 |
+
select_columns = [
|
| 293 |
+
"modelId",
|
| 294 |
+
"summary",
|
| 295 |
+
"likes",
|
| 296 |
+
"downloads",
|
| 297 |
+
"last_modified",
|
| 298 |
+
]
|
| 299 |
+
if "param_count" in schema:
|
| 300 |
+
logger.info("Found 'param_count' column in model data schema.")
|
| 301 |
+
select_columns.append("param_count")
|
| 302 |
+
else:
|
| 303 |
+
logger.warning(
|
| 304 |
+
"'param_count' column not found in model data schema. Will add it with null values."
|
| 305 |
+
)
|
| 306 |
+
|
| 307 |
+
# Filter and process only newer model records
|
| 308 |
+
model_df = model_lazy_df.select(select_columns)
|
| 309 |
+
|
| 310 |
+
# Apply timestamp filtering like we do for datasets
|
| 311 |
if model_latest_update:
|
| 312 |
+
logger.info(f"Filtering model records newer than {model_latest_update}")
|
| 313 |
+
model_df = model_df.with_columns(pl.col("last_modified").str.to_datetime())
|
| 314 |
+
model_df = model_df.filter(pl.col("last_modified") > model_latest_update)
|
| 315 |
+
model_filtered_count = model_df.select(pl.len()).collect().item()
|
| 316 |
+
logger.info(f"Found {model_filtered_count} model records to update after filtering")
|
| 317 |
+
else:
|
| 318 |
+
model_filtered_count = model_df.select(pl.len()).collect().item()
|
| 319 |
+
logger.info(f"Initial model load: processing all {model_filtered_count} model records")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 320 |
|
| 321 |
+
if model_filtered_count > 0:
|
| 322 |
+
model_df = model_df.collect()
|
| 323 |
|
| 324 |
# If param_count was not in the original schema, add it now to the collected DataFrame
|
| 325 |
if "param_count" not in model_df.columns:
|
|
|
|
| 328 |
)
|
| 329 |
|
| 330 |
total_rows = len(model_df)
|
| 331 |
+
logger.info(f"Updating model collection with {total_rows} new records")
|
| 332 |
|
| 333 |
for i in range(0, total_rows, BATCH_SIZE):
|
| 334 |
batch_df = model_df.slice(i, min(BATCH_SIZE, total_rows - i))
|