rcai commited on
Commit
710de3c
·
verified ·
1 Parent(s): 3fa4af9

Update test.py

Browse files
Files changed (1) hide show
  1. test.py +116 -0
test.py CHANGED
@@ -392,3 +392,119 @@ Step 11. **HARD CONSTRAINT – Secondary cancer aggregation (windowed by the sel
392
 
393
  "secondary_cancer_types_within_30d_of_progression": []
394
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
392
 
393
  "secondary_cancer_types_within_30d_of_progression": []
394
 
395
+
396
+ from datetime import datetime, timedelta
397
+ from typing import List, Dict, Any, Optional, Tuple
398
+ import re
399
+
400
+ # --- helpers ---------------------------------------------------------------
401
+
402
+ def parse_date(s: Optional[str]) -> Optional[datetime]:
403
+ """Parse common date formats to a datetime.date (YYYY-MM-DD, M/D/YYYY, etc.)."""
404
+ if not s or not isinstance(s, str):
405
+ return None
406
+ s = s.strip()
407
+ fmts = ["%Y-%m-%d", "%Y/%m/%d", "%m/%d/%Y", "%d-%b-%Y", "%d-%B-%Y"]
408
+ for fmt in fmts:
409
+ try:
410
+ return datetime.strptime(s, fmt)
411
+ except Exception:
412
+ pass
413
+ # try M/D/YY
414
+ m = re.search(r"(\d{1,2})/(\d{1,2})/(\d{2})$", s)
415
+ if m:
416
+ mm, dd, yy = map(int, m.groups())
417
+ yy = (2000 + yy) if yy < 50 else (1900 + yy)
418
+ try:
419
+ return datetime(yy, mm, dd)
420
+ except Exception:
421
+ return None
422
+ return None
423
+
424
+ def iso(d: Optional[datetime]) -> str:
425
+ return d.strftime("%Y-%m-%d") if d else ""
426
+
427
+ def anchor_date_for_event(ev: Dict[str, Any]) -> Optional[datetime]:
428
+ """
429
+ Step-13 anchor:
430
+ D = date_of_disease_progression_assessment
431
+ or treatment_change.start_date_of_treatment
432
+ or date_of_secondary_cancer_diagnosis
433
+ """
434
+ d_prog = parse_date(ev.get("date_of_disease_progression_assessment"))
435
+ if d_prog:
436
+ return d_prog
437
+ start_tx = parse_date((ev.get("treatment_change") or {}).get("start_date_of_treatment"))
438
+ if start_tx:
439
+ return start_tx
440
+ return parse_date(ev.get("date_of_secondary_cancer_diagnosis"))
441
+
442
+ def collect_secondary_pool(observations: List[Dict[str, Any]]) -> List[Tuple[str, datetime]]:
443
+ """Collect (secondary_cancer_type, diagnosis_date) pairs from all observations."""
444
+ pool: List[Tuple[str, datetime]] = []
445
+ for ob in observations:
446
+ t = (ob.get("secondary_cancer_type") or "").strip()
447
+ dt = parse_date(ob.get("date_of_secondary_cancer_diagnosis"))
448
+ if t and dt:
449
+ pool.append((t, dt))
450
+ return pool
451
+
452
+ # --- main aggregation ------------------------------------------------------
453
+
454
+ def aggregate_secondary_types_within_30d(
455
+ observations: List[Dict[str, Any]],
456
+ id_field: str = "segment_id",
457
+ ) -> List[Dict[str, Any]]:
458
+ """
459
+ For EACH event in `observations`, compute the distinct secondary tumor types whose
460
+ diagnosis dates fall within ±30 days of the event's anchor date (D).
461
+ Returns a new list (does not mutate input) with:
462
+ - 'secondary_cancer_types_within_30d_of_progression': List[str]
463
+ - 'secondary_cancer_types_within_30d_of_progression_csv': str
464
+ - 'date_of_disease_progression_assessment' normalized to YYYY-MM-DD (if present)
465
+ """
466
+ pool = collect_secondary_pool(observations)
467
+ out: List[Dict[str, Any]] = []
468
+
469
+ for ev in observations:
470
+ ev2 = dict(ev) # shallow copy
471
+ D = anchor_date_for_event(ev2)
472
+
473
+ # normalize progression date if present
474
+ if ev2.get("date_of_disease_progression_assessment"):
475
+ ev2["date_of_disease_progression_assessment"] = iso(parse_date(ev2.get("date_of_disease_progression_assessment")))
476
+
477
+ if not D:
478
+ ev2["secondary_cancer_types_within_30d_of_progression"] = []
479
+ ev2["secondary_cancer_types_within_30d_of_progression_csv"] = ""
480
+ out.append(ev2)
481
+ continue
482
+
483
+ lo, hi = D - timedelta(days=30), D + timedelta(days=30)
484
+ hits = sorted({t for (t, dt) in pool if lo <= dt <= hi}, key=lambda s: s.lower())
485
+
486
+ ev2["secondary_cancer_types_within_30d_of_progression"] = hits
487
+ ev2["secondary_cancer_types_within_30d_of_progression_csv"] = ", ".join(hits)
488
+ out.append(ev2)
489
+
490
+ return out
491
+
492
+ # --- optional convenience: aggregate for a single assessment date 'y' -----
493
+
494
+ def aggregate_for_assessment_date_y(
495
+ observations: List[Dict[str, Any]],
496
+ y: str, # e.g., "2024-04-13" or "4/13/2024"
497
+ ) -> List[str]:
498
+ """
499
+ Given a specific disease-progression assessment date 'y', return the distinct
500
+ secondary tumor types with diagnosis_date within ±30 days of y.
501
+ """
502
+ D = parse_date(y)
503
+ if not D:
504
+ return []
505
+ lo, hi = D - timedelta(days=30), D + timedelta(days=30)
506
+ pool = collect_secondary_pool(observations)
507
+ return sorted({t for (t, dt) in pool if lo <= dt <= hi}, key=lambda s: s.lower())
508
+
509
+ print("For assessment date 2024-04-13:",
510
+ aggregate_for_assessment_date_y(observations, "2024-04-13"))