| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| use std::collections::{HashMap, HashSet, VecDeque}; |
|
|
| use crate::core::{Id, Point}; |
|
|
| |
| #[derive(Debug, Clone, Copy, PartialEq, Eq)] |
| pub enum ConsolidationLevel { |
| |
| |
| Light, |
|
|
| |
| |
| Medium, |
|
|
| |
| |
| Deep, |
|
|
| |
| |
| Full, |
| } |
|
|
| impl Default for ConsolidationLevel { |
| fn default() -> Self { |
| ConsolidationLevel::Medium |
| } |
| } |
|
|
| |
| #[derive(Debug, Clone)] |
| pub struct ConsolidationConfig { |
| |
| pub level: ConsolidationLevel, |
|
|
| |
| pub batch_size: usize, |
|
|
| |
| pub merge_threshold: usize, |
|
|
| |
| pub split_threshold: usize, |
|
|
| |
| |
| pub drift_threshold: f32, |
|
|
| |
| pub collect_metrics: bool, |
| } |
|
|
| impl Default for ConsolidationConfig { |
| fn default() -> Self { |
| Self { |
| level: ConsolidationLevel::Medium, |
| batch_size: 100, |
| merge_threshold: 3, |
| split_threshold: 100, |
| drift_threshold: 0.01, |
| collect_metrics: true, |
| } |
| } |
| } |
|
|
| impl ConsolidationConfig { |
| pub fn light() -> Self { |
| Self { |
| level: ConsolidationLevel::Light, |
| ..Default::default() |
| } |
| } |
|
|
| pub fn medium() -> Self { |
| Self { |
| level: ConsolidationLevel::Medium, |
| ..Default::default() |
| } |
| } |
|
|
| pub fn deep() -> Self { |
| Self { |
| level: ConsolidationLevel::Deep, |
| ..Default::default() |
| } |
| } |
|
|
| pub fn full() -> Self { |
| Self { |
| level: ConsolidationLevel::Full, |
| ..Default::default() |
| } |
| } |
|
|
| pub fn with_batch_size(mut self, size: usize) -> Self { |
| self.batch_size = size; |
| self |
| } |
| } |
|
|
| |
| #[derive(Debug, Clone, Copy, PartialEq, Eq)] |
| pub enum ConsolidationPhase { |
| |
| Idle, |
|
|
| |
| CollectingLeaves, |
|
|
| |
| RecomputingCentroids, |
|
|
| |
| AnalyzingStructure, |
|
|
| |
| Merging, |
|
|
| |
| Splitting, |
|
|
| |
| Pruning, |
|
|
| |
| OptimizingLayout, |
|
|
| |
| Complete, |
| } |
|
|
| |
| #[derive(Debug, Clone, Default)] |
| pub struct ConsolidationMetrics { |
| |
| pub containers_processed: usize, |
|
|
| |
| pub centroids_recomputed: usize, |
|
|
| |
| pub avg_centroid_drift: f32, |
|
|
| |
| pub max_centroid_drift: f32, |
|
|
| |
| pub containers_merged: usize, |
|
|
| |
| pub containers_split: usize, |
|
|
| |
| pub containers_pruned: usize, |
|
|
| |
| pub phase_times_us: HashMap<String, u64>, |
|
|
| |
| pub total_time_us: u64, |
|
|
| |
| pub ticks: usize, |
| } |
|
|
| |
| #[derive(Debug, Clone)] |
| pub struct ConsolidationProgress { |
| |
| pub phase: ConsolidationPhase, |
|
|
| |
| pub progress: f32, |
|
|
| |
| pub remaining: usize, |
|
|
| |
| pub metrics: ConsolidationMetrics, |
| } |
|
|
| |
| #[derive(Debug)] |
| pub struct ConsolidationState { |
| |
| pub config: ConsolidationConfig, |
|
|
| |
| pub phase: ConsolidationPhase, |
|
|
| |
| pub metrics: ConsolidationMetrics, |
|
|
| |
| pub work_queue: VecDeque<Id>, |
|
|
| |
| pub processed: HashSet<Id>, |
|
|
| |
| centroid_drifts: Vec<f32>, |
|
|
| |
| merge_candidates: Vec<(Id, Id)>, |
|
|
| |
| split_candidates: Vec<Id>, |
|
|
| |
| phase_start_us: u64, |
|
|
| |
| start_us: u64, |
| } |
|
|
| impl ConsolidationState { |
| |
| pub fn new(config: ConsolidationConfig) -> Self { |
| let now = std::time::SystemTime::now() |
| .duration_since(std::time::UNIX_EPOCH) |
| .unwrap() |
| .as_micros() as u64; |
|
|
| Self { |
| config, |
| phase: ConsolidationPhase::Idle, |
| metrics: ConsolidationMetrics::default(), |
| work_queue: VecDeque::new(), |
| processed: HashSet::new(), |
| centroid_drifts: Vec::new(), |
| merge_candidates: Vec::new(), |
| split_candidates: Vec::new(), |
| phase_start_us: now, |
| start_us: now, |
| } |
| } |
|
|
| |
| pub fn start(&mut self) { |
| let now = std::time::SystemTime::now() |
| .duration_since(std::time::UNIX_EPOCH) |
| .unwrap() |
| .as_micros() as u64; |
|
|
| self.start_us = now; |
| self.phase_start_us = now; |
| self.phase = ConsolidationPhase::CollectingLeaves; |
| self.metrics = ConsolidationMetrics::default(); |
| self.work_queue.clear(); |
| self.processed.clear(); |
| self.centroid_drifts.clear(); |
| self.merge_candidates.clear(); |
| self.split_candidates.clear(); |
| } |
|
|
| |
| pub fn next_phase(&mut self) { |
| let now = std::time::SystemTime::now() |
| .duration_since(std::time::UNIX_EPOCH) |
| .unwrap() |
| .as_micros() as u64; |
|
|
| |
| let phase_time = now - self.phase_start_us; |
| let phase_name = format!("{:?}", self.phase); |
| self.metrics.phase_times_us.insert(phase_name, phase_time); |
|
|
| |
| if !self.centroid_drifts.is_empty() { |
| self.metrics.avg_centroid_drift = |
| self.centroid_drifts.iter().sum::<f32>() / self.centroid_drifts.len() as f32; |
| } |
|
|
| |
| self.phase = match (self.phase, self.config.level) { |
| (ConsolidationPhase::Idle, _) => ConsolidationPhase::CollectingLeaves, |
|
|
| (ConsolidationPhase::CollectingLeaves, _) => ConsolidationPhase::RecomputingCentroids, |
|
|
| (ConsolidationPhase::RecomputingCentroids, ConsolidationLevel::Light) => { |
| ConsolidationPhase::Complete |
| } |
| (ConsolidationPhase::RecomputingCentroids, _) => { |
| ConsolidationPhase::AnalyzingStructure |
| } |
|
|
| (ConsolidationPhase::AnalyzingStructure, _) => ConsolidationPhase::Merging, |
|
|
| (ConsolidationPhase::Merging, _) => ConsolidationPhase::Splitting, |
|
|
| (ConsolidationPhase::Splitting, ConsolidationLevel::Medium) => { |
| ConsolidationPhase::Complete |
| } |
| (ConsolidationPhase::Splitting, _) => ConsolidationPhase::Pruning, |
|
|
| (ConsolidationPhase::Pruning, _) => ConsolidationPhase::OptimizingLayout, |
|
|
| (ConsolidationPhase::OptimizingLayout, _) => ConsolidationPhase::Complete, |
|
|
| (ConsolidationPhase::Complete, _) => ConsolidationPhase::Complete, |
| }; |
|
|
| |
| self.phase_start_us = now; |
| self.work_queue.clear(); |
| self.processed.clear(); |
|
|
| |
| if self.phase == ConsolidationPhase::Complete { |
| self.metrics.total_time_us = now - self.start_us; |
| } |
| } |
|
|
| |
| pub fn record_drift(&mut self, drift: f32) { |
| self.centroid_drifts.push(drift); |
| if drift > self.metrics.max_centroid_drift { |
| self.metrics.max_centroid_drift = drift; |
| } |
| } |
|
|
| |
| pub fn add_merge_candidate(&mut self, a: Id, b: Id) { |
| self.merge_candidates.push((a, b)); |
| } |
|
|
| |
| pub fn add_split_candidate(&mut self, id: Id) { |
| self.split_candidates.push(id); |
| } |
|
|
| |
| pub fn next_merge(&mut self) -> Option<(Id, Id)> { |
| self.merge_candidates.pop() |
| } |
|
|
| |
| pub fn next_split(&mut self) -> Option<Id> { |
| self.split_candidates.pop() |
| } |
|
|
| |
| pub fn has_merges(&self) -> bool { |
| !self.merge_candidates.is_empty() |
| } |
|
|
| |
| pub fn has_splits(&self) -> bool { |
| !self.split_candidates.is_empty() |
| } |
|
|
| |
| pub fn is_complete(&self) -> bool { |
| self.phase == ConsolidationPhase::Complete |
| } |
|
|
| |
| pub fn progress(&self) -> ConsolidationProgress { |
| let remaining = self.work_queue.len(); |
| let total = remaining + self.processed.len(); |
| let progress = if total > 0 { |
| self.processed.len() as f32 / total as f32 |
| } else { |
| 1.0 |
| }; |
|
|
| ConsolidationProgress { |
| phase: self.phase, |
| progress, |
| remaining, |
| metrics: self.metrics.clone(), |
| } |
| } |
| } |
|
|
| |
| #[derive(Debug)] |
| pub enum ConsolidationTickResult { |
| |
| Continue(ConsolidationProgress), |
|
|
| |
| Complete(ConsolidationMetrics), |
| } |
|
|
| |
| pub trait Consolidate { |
| |
| fn begin_consolidation(&mut self, config: ConsolidationConfig); |
|
|
| |
| |
| fn consolidation_tick(&mut self) -> ConsolidationTickResult; |
|
|
| |
| fn consolidate(&mut self, config: ConsolidationConfig) -> ConsolidationMetrics { |
| self.begin_consolidation(config); |
| loop { |
| match self.consolidation_tick() { |
| ConsolidationTickResult::Continue(_) => continue, |
| ConsolidationTickResult::Complete(metrics) => return metrics, |
| } |
| } |
| } |
|
|
| |
| fn is_consolidating(&self) -> bool; |
|
|
| |
| fn consolidation_progress(&self) -> Option<ConsolidationProgress>; |
|
|
| |
| fn cancel_consolidation(&mut self); |
| } |
|
|
| |
| pub fn compute_exact_centroid(points: &[Point]) -> Option<Point> { |
| if points.is_empty() { |
| return None; |
| } |
|
|
| let dims = points[0].dimensionality(); |
| let mut sum = vec![0.0f32; dims]; |
|
|
| for point in points { |
| for (i, &val) in point.dims().iter().enumerate() { |
| sum[i] += val; |
| } |
| } |
|
|
| let n = points.len() as f32; |
| let mean: Vec<f32> = sum.iter().map(|s| s / n).collect(); |
|
|
| Some(Point::new(mean).normalize()) |
| } |
|
|
| |
| pub fn centroid_drift(old: &Point, new: &Point) -> f32 { |
| old.dims() |
| .iter() |
| .zip(new.dims().iter()) |
| .map(|(a, b)| (a - b).powi(2)) |
| .sum::<f32>() |
| .sqrt() |
| } |
|
|
| #[cfg(test)] |
| mod tests { |
| use super::*; |
|
|
| #[test] |
| fn test_consolidation_config_levels() { |
| let light = ConsolidationConfig::light(); |
| assert_eq!(light.level, ConsolidationLevel::Light); |
|
|
| let medium = ConsolidationConfig::medium(); |
| assert_eq!(medium.level, ConsolidationLevel::Medium); |
|
|
| let deep = ConsolidationConfig::deep(); |
| assert_eq!(deep.level, ConsolidationLevel::Deep); |
|
|
| let full = ConsolidationConfig::full(); |
| assert_eq!(full.level, ConsolidationLevel::Full); |
| } |
|
|
| #[test] |
| fn test_consolidation_state_phases() { |
| let config = ConsolidationConfig::light(); |
| let mut state = ConsolidationState::new(config); |
|
|
| assert_eq!(state.phase, ConsolidationPhase::Idle); |
|
|
| state.start(); |
| assert_eq!(state.phase, ConsolidationPhase::CollectingLeaves); |
|
|
| state.next_phase(); |
| assert_eq!(state.phase, ConsolidationPhase::RecomputingCentroids); |
|
|
| |
| state.next_phase(); |
| assert_eq!(state.phase, ConsolidationPhase::Complete); |
| assert!(state.is_complete()); |
| } |
|
|
| #[test] |
| fn test_consolidation_state_medium_phases() { |
| let config = ConsolidationConfig::medium(); |
| let mut state = ConsolidationState::new(config); |
|
|
| state.start(); |
| assert_eq!(state.phase, ConsolidationPhase::CollectingLeaves); |
|
|
| state.next_phase(); |
| assert_eq!(state.phase, ConsolidationPhase::RecomputingCentroids); |
|
|
| state.next_phase(); |
| assert_eq!(state.phase, ConsolidationPhase::AnalyzingStructure); |
|
|
| state.next_phase(); |
| assert_eq!(state.phase, ConsolidationPhase::Merging); |
|
|
| state.next_phase(); |
| assert_eq!(state.phase, ConsolidationPhase::Splitting); |
|
|
| |
| state.next_phase(); |
| assert_eq!(state.phase, ConsolidationPhase::Complete); |
| } |
|
|
| #[test] |
| fn test_centroid_computation() { |
| let points = vec![ |
| Point::new(vec![1.0, 0.0, 0.0]), |
| Point::new(vec![0.0, 1.0, 0.0]), |
| Point::new(vec![0.0, 0.0, 1.0]), |
| ]; |
|
|
| let centroid = compute_exact_centroid(&points).unwrap(); |
|
|
| |
| let expected_unnorm = (1.0f32 / 3.0).sqrt(); |
| for dim in centroid.dims() { |
| assert!((dim - expected_unnorm).abs() < 0.01); |
| } |
| } |
|
|
| #[test] |
| fn test_centroid_drift() { |
| let old = Point::new(vec![1.0, 0.0, 0.0]); |
| let new = Point::new(vec![0.9, 0.1, 0.0]).normalize(); |
|
|
| let drift = centroid_drift(&old, &new); |
| assert!(drift > 0.0); |
| assert!(drift < 1.0); |
| } |
|
|
| #[test] |
| fn test_drift_recording() { |
| let config = ConsolidationConfig::default(); |
| let mut state = ConsolidationState::new(config); |
|
|
| state.record_drift(0.05); |
| state.record_drift(0.10); |
| state.record_drift(0.02); |
|
|
| assert_eq!(state.metrics.max_centroid_drift, 0.10); |
| assert_eq!(state.centroid_drifts.len(), 3); |
| } |
| } |
|
|