Upload data/data_fetcher.py with huggingface_hub
Browse files- data/data_fetcher.py +4 -15
data/data_fetcher.py
CHANGED
|
@@ -445,12 +445,13 @@ class DataFetcher:
|
|
| 445 |
_t_query_start = time.perf_counter()
|
| 446 |
|
| 447 |
# Cypher query to find direct neighbors of the current frontier
|
|
|
|
| 448 |
query = """
|
| 449 |
MATCH (a)-[r]-(b)
|
| 450 |
-
WHERE a.address IN $addresses
|
| 451 |
RETURN a.address AS source_address, type(r) AS link_type, properties(r) AS link_props, b.address AS dest_address, labels(b)[0] AS dest_type
|
| 452 |
"""
|
| 453 |
-
params = {'addresses': list(newly_found_entities)}
|
| 454 |
result = session.run(query, params)
|
| 455 |
|
| 456 |
_t_query_done = time.perf_counter()
|
|
@@ -458,23 +459,12 @@ class DataFetcher:
|
|
| 458 |
# --- TIMING: Result processing ---
|
| 459 |
_t_process_start = time.perf_counter()
|
| 460 |
records_total = 0
|
| 461 |
-
records_filtered_by_time = 0
|
| 462 |
-
records_kept = 0
|
| 463 |
|
| 464 |
current_degree_new_entities = set()
|
| 465 |
for record in result:
|
| 466 |
records_total += 1
|
| 467 |
link_type = record['link_type']
|
| 468 |
link_props = dict(record['link_props'])
|
| 469 |
-
link_ts_raw = link_props.get('timestamp')
|
| 470 |
-
try:
|
| 471 |
-
link_ts = int(link_ts_raw)
|
| 472 |
-
except (TypeError, ValueError):
|
| 473 |
-
continue
|
| 474 |
-
if link_ts > cutoff_ts:
|
| 475 |
-
records_filtered_by_time += 1
|
| 476 |
-
continue
|
| 477 |
-
records_kept += 1
|
| 478 |
source_addr = record['source_address']
|
| 479 |
dest_addr = record['dest_address']
|
| 480 |
dest_type = record['dest_type']
|
|
@@ -493,8 +483,7 @@ class DataFetcher:
|
|
| 493 |
# --- TIMING: Print detailed stats ---
|
| 494 |
print(f" [NEO4J TIMING] query_exec: {(_t_query_done - _t_query_start)*1000:.1f}ms, "
|
| 495 |
f"result_process: {(_t_process_done - _t_process_start)*1000:.1f}ms")
|
| 496 |
-
print(f" [NEO4J STATS]
|
| 497 |
-
f"filtered_by_time: {records_filtered_by_time}, kept: {records_kept}, "
|
| 498 |
f"new_entities: {len(current_degree_new_entities)}")
|
| 499 |
|
| 500 |
newly_found_entities = current_degree_new_entities
|
|
|
|
| 445 |
_t_query_start = time.perf_counter()
|
| 446 |
|
| 447 |
# Cypher query to find direct neighbors of the current frontier
|
| 448 |
+
# OPTIMIZED: Filter by timestamp IN Neo4j to avoid transferring 97%+ unused records
|
| 449 |
query = """
|
| 450 |
MATCH (a)-[r]-(b)
|
| 451 |
+
WHERE a.address IN $addresses AND r.timestamp <= $cutoff_ts
|
| 452 |
RETURN a.address AS source_address, type(r) AS link_type, properties(r) AS link_props, b.address AS dest_address, labels(b)[0] AS dest_type
|
| 453 |
"""
|
| 454 |
+
params = {'addresses': list(newly_found_entities), 'cutoff_ts': cutoff_ts}
|
| 455 |
result = session.run(query, params)
|
| 456 |
|
| 457 |
_t_query_done = time.perf_counter()
|
|
|
|
| 459 |
# --- TIMING: Result processing ---
|
| 460 |
_t_process_start = time.perf_counter()
|
| 461 |
records_total = 0
|
|
|
|
|
|
|
| 462 |
|
| 463 |
current_degree_new_entities = set()
|
| 464 |
for record in result:
|
| 465 |
records_total += 1
|
| 466 |
link_type = record['link_type']
|
| 467 |
link_props = dict(record['link_props'])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 468 |
source_addr = record['source_address']
|
| 469 |
dest_addr = record['dest_address']
|
| 470 |
dest_type = record['dest_type']
|
|
|
|
| 483 |
# --- TIMING: Print detailed stats ---
|
| 484 |
print(f" [NEO4J TIMING] query_exec: {(_t_query_done - _t_query_start)*1000:.1f}ms, "
|
| 485 |
f"result_process: {(_t_process_done - _t_process_start)*1000:.1f}ms")
|
| 486 |
+
print(f" [NEO4J STATS] records_returned: {records_total}, "
|
|
|
|
| 487 |
f"new_entities: {len(current_degree_new_entities)}")
|
| 488 |
|
| 489 |
newly_found_entities = current_degree_new_entities
|