Update scripts/ingest_epoch.py: speed + balance + correctness fixes
Browse files- scripts/ingest_epoch.py +16 -0
scripts/ingest_epoch.py
CHANGED
|
@@ -394,6 +394,22 @@ def run_etl(
|
|
| 394 |
# Step 4: Neo4j dump
|
| 395 |
neo4j_path = dest_dir / NEO4J_FILENAME.format(epoch=epoch)
|
| 396 |
if neo4j_path.exists() and not skip_neo4j:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 397 |
if merge_neo4j:
|
| 398 |
merge_neo4j_epoch_dump(epoch, neo4j_path, dry_run=dry_run)
|
| 399 |
else:
|
|
|
|
| 394 |
# Step 4: Neo4j dump
|
| 395 |
neo4j_path = dest_dir / NEO4J_FILENAME.format(epoch=epoch)
|
| 396 |
if neo4j_path.exists() and not skip_neo4j:
|
| 397 |
+
# Auto-detect: if target DB is empty, use direct load instead of merge
|
| 398 |
+
# This prevents the merge-on-empty-DB hang when the first epoch uses --merge-neo4j
|
| 399 |
+
if merge_neo4j and not dry_run:
|
| 400 |
+
try:
|
| 401 |
+
_driver = _neo4j_driver()
|
| 402 |
+
with _driver.session(database=NEO4J_TARGET_DB) as _session:
|
| 403 |
+
_count = _session.run("MATCH (n) RETURN count(n) AS c").single()["c"]
|
| 404 |
+
_driver.close()
|
| 405 |
+
if _count == 0:
|
| 406 |
+
print(f" [AUTO] Target DB is empty ({_count} nodes). Switching to direct load instead of merge.")
|
| 407 |
+
merge_neo4j = False
|
| 408 |
+
else:
|
| 409 |
+
print(f" [AUTO] Target DB has {_count} nodes. Proceeding with merge.")
|
| 410 |
+
except Exception as e:
|
| 411 |
+
print(f" [WARN] Could not check target DB node count: {e}. Proceeding with merge flag as-is.")
|
| 412 |
+
|
| 413 |
if merge_neo4j:
|
| 414 |
merge_neo4j_epoch_dump(epoch, neo4j_path, dry_run=dry_run)
|
| 415 |
else:
|