Fix: Neo4j permissions + stop-on-failure (scripts/ingest_epoch.py)
Browse files- scripts/ingest_epoch.py +17 -3
scripts/ingest_epoch.py
CHANGED
|
@@ -411,11 +411,15 @@ def run_etl(
|
|
| 411 |
print(f" [WARN] Could not check target DB node count: {e}. Proceeding with merge flag as-is.")
|
| 412 |
|
| 413 |
if merge_neo4j:
|
| 414 |
-
merge_neo4j_epoch_dump(epoch, neo4j_path, dry_run=dry_run)
|
|
|
|
|
|
|
|
|
|
| 415 |
else:
|
| 416 |
ok = ingest_neo4j_dump(neo4j_path, database=NEO4J_TARGET_DB, dry_run=dry_run)
|
| 417 |
if not ok:
|
| 418 |
-
print(" ❌ Neo4j dump load failed.")
|
|
|
|
| 419 |
elif neo4j_path.exists() and skip_neo4j:
|
| 420 |
print(f"\nℹ️ Neo4j dump found but skipped: {neo4j_path}")
|
| 421 |
|
|
@@ -495,6 +499,9 @@ def ingest_neo4j_dump(dump_path: Path, database: str = "neo4j", dry_run: bool =
|
|
| 495 |
print(f" ❌ Failed to stop Neo4j: {stop_result.stderr.strip()}")
|
| 496 |
return False
|
| 497 |
_ensure_neo4j_data_writable()
|
|
|
|
|
|
|
|
|
|
| 498 |
|
| 499 |
if dry_run:
|
| 500 |
print(f" [DRY-RUN] {' '.join(cmd)}")
|
|
@@ -700,7 +707,7 @@ def _stream_merge(
|
|
| 700 |
_bulk_merge_apoc(temp_session, target_session, match_query, merge_query, label, total)
|
| 701 |
|
| 702 |
|
| 703 |
-
def merge_neo4j_epoch_dump(epoch: int, dump_path: Path, dry_run: bool = False) ->
|
| 704 |
"""
|
| 705 |
Merge relationships from an epoch dump into the target DB by keeping the oldest timestamp.
|
| 706 |
Relationship uniqueness is enforced by (start, end, type) only.
|
|
@@ -792,6 +799,8 @@ def merge_neo4j_epoch_dump(epoch: int, dump_path: Path, dry_run: bool = False) -
|
|
| 792 |
import subprocess
|
| 793 |
try:
|
| 794 |
subprocess.run(["chown", "-R", "neo4j:adm", str(temp_root)], check=True)
|
|
|
|
|
|
|
| 795 |
except Exception:
|
| 796 |
pass
|
| 797 |
|
|
@@ -950,8 +959,13 @@ def merge_neo4j_epoch_dump(epoch: int, dump_path: Path, dry_run: bool = False) -
|
|
| 950 |
process_wallet_token_rel(rel_type)
|
| 951 |
|
| 952 |
print(" ✅ Merge complete.")
|
|
|
|
| 953 |
except subprocess.CalledProcessError as e:
|
| 954 |
print(f" ❌ Failed to merge Neo4j dump: {e.stderr}")
|
|
|
|
|
|
|
|
|
|
|
|
|
| 955 |
finally:
|
| 956 |
if temp_driver:
|
| 957 |
temp_driver.close()
|
|
|
|
| 411 |
print(f" [WARN] Could not check target DB node count: {e}. Proceeding with merge flag as-is.")
|
| 412 |
|
| 413 |
if merge_neo4j:
|
| 414 |
+
ok = merge_neo4j_epoch_dump(epoch, neo4j_path, dry_run=dry_run)
|
| 415 |
+
if not ok:
|
| 416 |
+
print(" ❌ Neo4j merge failed. Aborting.")
|
| 417 |
+
sys.exit(1)
|
| 418 |
else:
|
| 419 |
ok = ingest_neo4j_dump(neo4j_path, database=NEO4J_TARGET_DB, dry_run=dry_run)
|
| 420 |
if not ok:
|
| 421 |
+
print(" ❌ Neo4j dump load failed. Aborting.")
|
| 422 |
+
sys.exit(1)
|
| 423 |
elif neo4j_path.exists() and skip_neo4j:
|
| 424 |
print(f"\nℹ️ Neo4j dump found but skipped: {neo4j_path}")
|
| 425 |
|
|
|
|
| 499 |
print(f" ❌ Failed to stop Neo4j: {stop_result.stderr.strip()}")
|
| 500 |
return False
|
| 501 |
_ensure_neo4j_data_writable()
|
| 502 |
+
# Explicit chown to fix permission mismatch when Neo4j was started as root
|
| 503 |
+
if os.geteuid() == 0:
|
| 504 |
+
subprocess.run(["chown", "-R", "neo4j:neo4j", "/var/lib/neo4j/data"], check=False)
|
| 505 |
|
| 506 |
if dry_run:
|
| 507 |
print(f" [DRY-RUN] {' '.join(cmd)}")
|
|
|
|
| 707 |
_bulk_merge_apoc(temp_session, target_session, match_query, merge_query, label, total)
|
| 708 |
|
| 709 |
|
| 710 |
+
def merge_neo4j_epoch_dump(epoch: int, dump_path: Path, dry_run: bool = False) -> bool:
|
| 711 |
"""
|
| 712 |
Merge relationships from an epoch dump into the target DB by keeping the oldest timestamp.
|
| 713 |
Relationship uniqueness is enforced by (start, end, type) only.
|
|
|
|
| 799 |
import subprocess
|
| 800 |
try:
|
| 801 |
subprocess.run(["chown", "-R", "neo4j:adm", str(temp_root)], check=True)
|
| 802 |
+
# Also fix main Neo4j data dir permissions (lock file may be owned by root)
|
| 803 |
+
subprocess.run(["chown", "-R", "neo4j:neo4j", "/var/lib/neo4j/data"], check=False)
|
| 804 |
except Exception:
|
| 805 |
pass
|
| 806 |
|
|
|
|
| 959 |
process_wallet_token_rel(rel_type)
|
| 960 |
|
| 961 |
print(" ✅ Merge complete.")
|
| 962 |
+
return True
|
| 963 |
except subprocess.CalledProcessError as e:
|
| 964 |
print(f" ❌ Failed to merge Neo4j dump: {e.stderr}")
|
| 965 |
+
return False
|
| 966 |
+
except Exception as e:
|
| 967 |
+
print(f" ❌ Failed to merge Neo4j dump: {e}")
|
| 968 |
+
return False
|
| 969 |
finally:
|
| 970 |
if temp_driver:
|
| 971 |
temp_driver.close()
|