NeoLUS-AI / pipeline /rules.py
Ryonaly's picture
Create pipeline/rules.py
1ba1783 verified
from dataclasses import dataclass
from typing import List, Optional
import statistics, math
@dataclass
class FrameFeatures:
b_count: int
b_area_frac: float = 0.0
has_consolidation: bool = False
quality: float = 1.0
@dataclass
class ZoneFeatures:
b_count: int
b_area_frac: float
has_consolidation: bool
def aggregate_zone(frames: List[FrameFeatures],
min_quality: float = 0.5, min_frames: int = 3,
coalescent_percentile: float = 0.90, consolidation_majority: float = 0.25) -> Optional[ZoneFeatures]:
kept=[f for f in frames if f.quality>=min_quality]
if len(kept)<min_frames: return None
b_counts=[f.b_count for f in kept]
area_vals=sorted(f.b_area_frac for f in kept)
p_idx=max(0,min(len(area_vals)-1,math.floor(coalescent_percentile*(len(area_vals)-1))))
b_area_agg=area_vals[p_idx]
has_consol=(sum(1 for f in kept if f.has_consolidation)/len(kept))>=consolidation_majority
return ZoneFeatures(int(round(statistics.median(b_counts))), float(b_area_agg), bool(has_consol))
def zone_lus_score(z: ZoneFeatures, b_count_threshold: int = 3, coalescent_area_threshold: float = 0.40) -> int:
if z.has_consolidation: return 3
if z.b_area_frac >= coalescent_area_threshold: return 2
if z.b_count >= b_count_threshold: return 1
return 0
def patient_lus_score(zones: List[Optional[ZoneFeatures]]) -> int:
return sum(zone_lus_score(z) for z in zones if z is not None)