|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
use std::collections::{HashMap, HashSet, VecDeque}; |
|
|
|
|
|
use crate::core::{Id, Point}; |
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)] |
|
|
pub enum ConsolidationLevel { |
|
|
|
|
|
|
|
|
Light, |
|
|
|
|
|
|
|
|
|
|
|
Medium, |
|
|
|
|
|
|
|
|
|
|
|
Deep, |
|
|
|
|
|
|
|
|
|
|
|
Full, |
|
|
} |
|
|
|
|
|
impl Default for ConsolidationLevel { |
|
|
fn default() -> Self { |
|
|
ConsolidationLevel::Medium |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
#[derive(Debug, Clone)] |
|
|
pub struct ConsolidationConfig { |
|
|
|
|
|
pub level: ConsolidationLevel, |
|
|
|
|
|
|
|
|
pub batch_size: usize, |
|
|
|
|
|
|
|
|
pub merge_threshold: usize, |
|
|
|
|
|
|
|
|
pub split_threshold: usize, |
|
|
|
|
|
|
|
|
|
|
|
pub drift_threshold: f32, |
|
|
|
|
|
|
|
|
pub collect_metrics: bool, |
|
|
} |
|
|
|
|
|
impl Default for ConsolidationConfig { |
|
|
fn default() -> Self { |
|
|
Self { |
|
|
level: ConsolidationLevel::Medium, |
|
|
batch_size: 100, |
|
|
merge_threshold: 3, |
|
|
split_threshold: 100, |
|
|
drift_threshold: 0.01, |
|
|
collect_metrics: true, |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
impl ConsolidationConfig { |
|
|
pub fn light() -> Self { |
|
|
Self { |
|
|
level: ConsolidationLevel::Light, |
|
|
..Default::default() |
|
|
} |
|
|
} |
|
|
|
|
|
pub fn medium() -> Self { |
|
|
Self { |
|
|
level: ConsolidationLevel::Medium, |
|
|
..Default::default() |
|
|
} |
|
|
} |
|
|
|
|
|
pub fn deep() -> Self { |
|
|
Self { |
|
|
level: ConsolidationLevel::Deep, |
|
|
..Default::default() |
|
|
} |
|
|
} |
|
|
|
|
|
pub fn full() -> Self { |
|
|
Self { |
|
|
level: ConsolidationLevel::Full, |
|
|
..Default::default() |
|
|
} |
|
|
} |
|
|
|
|
|
pub fn with_batch_size(mut self, size: usize) -> Self { |
|
|
self.batch_size = size; |
|
|
self |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)] |
|
|
pub enum ConsolidationPhase { |
|
|
|
|
|
Idle, |
|
|
|
|
|
|
|
|
CollectingLeaves, |
|
|
|
|
|
|
|
|
RecomputingCentroids, |
|
|
|
|
|
|
|
|
AnalyzingStructure, |
|
|
|
|
|
|
|
|
Merging, |
|
|
|
|
|
|
|
|
Splitting, |
|
|
|
|
|
|
|
|
Pruning, |
|
|
|
|
|
|
|
|
OptimizingLayout, |
|
|
|
|
|
|
|
|
Complete, |
|
|
} |
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Default)] |
|
|
pub struct ConsolidationMetrics { |
|
|
|
|
|
pub containers_processed: usize, |
|
|
|
|
|
|
|
|
pub centroids_recomputed: usize, |
|
|
|
|
|
|
|
|
pub avg_centroid_drift: f32, |
|
|
|
|
|
|
|
|
pub max_centroid_drift: f32, |
|
|
|
|
|
|
|
|
pub containers_merged: usize, |
|
|
|
|
|
|
|
|
pub containers_split: usize, |
|
|
|
|
|
|
|
|
pub containers_pruned: usize, |
|
|
|
|
|
|
|
|
pub phase_times_us: HashMap<String, u64>, |
|
|
|
|
|
|
|
|
pub total_time_us: u64, |
|
|
|
|
|
|
|
|
pub ticks: usize, |
|
|
} |
|
|
|
|
|
|
|
|
#[derive(Debug, Clone)] |
|
|
pub struct ConsolidationProgress { |
|
|
|
|
|
pub phase: ConsolidationPhase, |
|
|
|
|
|
|
|
|
pub progress: f32, |
|
|
|
|
|
|
|
|
pub remaining: usize, |
|
|
|
|
|
|
|
|
pub metrics: ConsolidationMetrics, |
|
|
} |
|
|
|
|
|
|
|
|
#[derive(Debug)] |
|
|
pub struct ConsolidationState { |
|
|
|
|
|
pub config: ConsolidationConfig, |
|
|
|
|
|
|
|
|
pub phase: ConsolidationPhase, |
|
|
|
|
|
|
|
|
pub metrics: ConsolidationMetrics, |
|
|
|
|
|
|
|
|
pub work_queue: VecDeque<Id>, |
|
|
|
|
|
|
|
|
pub processed: HashSet<Id>, |
|
|
|
|
|
|
|
|
centroid_drifts: Vec<f32>, |
|
|
|
|
|
|
|
|
merge_candidates: Vec<(Id, Id)>, |
|
|
|
|
|
|
|
|
split_candidates: Vec<Id>, |
|
|
|
|
|
|
|
|
phase_start_us: u64, |
|
|
|
|
|
|
|
|
start_us: u64, |
|
|
} |
|
|
|
|
|
impl ConsolidationState { |
|
|
|
|
|
pub fn new(config: ConsolidationConfig) -> Self { |
|
|
let now = std::time::SystemTime::now() |
|
|
.duration_since(std::time::UNIX_EPOCH) |
|
|
.unwrap() |
|
|
.as_micros() as u64; |
|
|
|
|
|
Self { |
|
|
config, |
|
|
phase: ConsolidationPhase::Idle, |
|
|
metrics: ConsolidationMetrics::default(), |
|
|
work_queue: VecDeque::new(), |
|
|
processed: HashSet::new(), |
|
|
centroid_drifts: Vec::new(), |
|
|
merge_candidates: Vec::new(), |
|
|
split_candidates: Vec::new(), |
|
|
phase_start_us: now, |
|
|
start_us: now, |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
pub fn start(&mut self) { |
|
|
let now = std::time::SystemTime::now() |
|
|
.duration_since(std::time::UNIX_EPOCH) |
|
|
.unwrap() |
|
|
.as_micros() as u64; |
|
|
|
|
|
self.start_us = now; |
|
|
self.phase_start_us = now; |
|
|
self.phase = ConsolidationPhase::CollectingLeaves; |
|
|
self.metrics = ConsolidationMetrics::default(); |
|
|
self.work_queue.clear(); |
|
|
self.processed.clear(); |
|
|
self.centroid_drifts.clear(); |
|
|
self.merge_candidates.clear(); |
|
|
self.split_candidates.clear(); |
|
|
} |
|
|
|
|
|
|
|
|
pub fn next_phase(&mut self) { |
|
|
let now = std::time::SystemTime::now() |
|
|
.duration_since(std::time::UNIX_EPOCH) |
|
|
.unwrap() |
|
|
.as_micros() as u64; |
|
|
|
|
|
|
|
|
let phase_time = now - self.phase_start_us; |
|
|
let phase_name = format!("{:?}", self.phase); |
|
|
self.metrics.phase_times_us.insert(phase_name, phase_time); |
|
|
|
|
|
|
|
|
if !self.centroid_drifts.is_empty() { |
|
|
self.metrics.avg_centroid_drift = |
|
|
self.centroid_drifts.iter().sum::<f32>() / self.centroid_drifts.len() as f32; |
|
|
} |
|
|
|
|
|
|
|
|
self.phase = match (self.phase, self.config.level) { |
|
|
(ConsolidationPhase::Idle, _) => ConsolidationPhase::CollectingLeaves, |
|
|
|
|
|
(ConsolidationPhase::CollectingLeaves, _) => ConsolidationPhase::RecomputingCentroids, |
|
|
|
|
|
(ConsolidationPhase::RecomputingCentroids, ConsolidationLevel::Light) => { |
|
|
ConsolidationPhase::Complete |
|
|
} |
|
|
(ConsolidationPhase::RecomputingCentroids, _) => { |
|
|
ConsolidationPhase::AnalyzingStructure |
|
|
} |
|
|
|
|
|
(ConsolidationPhase::AnalyzingStructure, _) => ConsolidationPhase::Merging, |
|
|
|
|
|
(ConsolidationPhase::Merging, _) => ConsolidationPhase::Splitting, |
|
|
|
|
|
(ConsolidationPhase::Splitting, ConsolidationLevel::Medium) => { |
|
|
ConsolidationPhase::Complete |
|
|
} |
|
|
(ConsolidationPhase::Splitting, _) => ConsolidationPhase::Pruning, |
|
|
|
|
|
(ConsolidationPhase::Pruning, _) => ConsolidationPhase::OptimizingLayout, |
|
|
|
|
|
(ConsolidationPhase::OptimizingLayout, _) => ConsolidationPhase::Complete, |
|
|
|
|
|
(ConsolidationPhase::Complete, _) => ConsolidationPhase::Complete, |
|
|
}; |
|
|
|
|
|
|
|
|
self.phase_start_us = now; |
|
|
self.work_queue.clear(); |
|
|
self.processed.clear(); |
|
|
|
|
|
|
|
|
if self.phase == ConsolidationPhase::Complete { |
|
|
self.metrics.total_time_us = now - self.start_us; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
pub fn record_drift(&mut self, drift: f32) { |
|
|
self.centroid_drifts.push(drift); |
|
|
if drift > self.metrics.max_centroid_drift { |
|
|
self.metrics.max_centroid_drift = drift; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
pub fn add_merge_candidate(&mut self, a: Id, b: Id) { |
|
|
self.merge_candidates.push((a, b)); |
|
|
} |
|
|
|
|
|
|
|
|
pub fn add_split_candidate(&mut self, id: Id) { |
|
|
self.split_candidates.push(id); |
|
|
} |
|
|
|
|
|
|
|
|
pub fn next_merge(&mut self) -> Option<(Id, Id)> { |
|
|
self.merge_candidates.pop() |
|
|
} |
|
|
|
|
|
|
|
|
pub fn next_split(&mut self) -> Option<Id> { |
|
|
self.split_candidates.pop() |
|
|
} |
|
|
|
|
|
|
|
|
pub fn has_merges(&self) -> bool { |
|
|
!self.merge_candidates.is_empty() |
|
|
} |
|
|
|
|
|
|
|
|
pub fn has_splits(&self) -> bool { |
|
|
!self.split_candidates.is_empty() |
|
|
} |
|
|
|
|
|
|
|
|
pub fn is_complete(&self) -> bool { |
|
|
self.phase == ConsolidationPhase::Complete |
|
|
} |
|
|
|
|
|
|
|
|
pub fn progress(&self) -> ConsolidationProgress { |
|
|
let remaining = self.work_queue.len(); |
|
|
let total = remaining + self.processed.len(); |
|
|
let progress = if total > 0 { |
|
|
self.processed.len() as f32 / total as f32 |
|
|
} else { |
|
|
1.0 |
|
|
}; |
|
|
|
|
|
ConsolidationProgress { |
|
|
phase: self.phase, |
|
|
progress, |
|
|
remaining, |
|
|
metrics: self.metrics.clone(), |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
#[derive(Debug)] |
|
|
pub enum ConsolidationTickResult { |
|
|
|
|
|
Continue(ConsolidationProgress), |
|
|
|
|
|
|
|
|
Complete(ConsolidationMetrics), |
|
|
} |
|
|
|
|
|
|
|
|
pub trait Consolidate { |
|
|
|
|
|
fn begin_consolidation(&mut self, config: ConsolidationConfig); |
|
|
|
|
|
|
|
|
|
|
|
fn consolidation_tick(&mut self) -> ConsolidationTickResult; |
|
|
|
|
|
|
|
|
fn consolidate(&mut self, config: ConsolidationConfig) -> ConsolidationMetrics { |
|
|
self.begin_consolidation(config); |
|
|
loop { |
|
|
match self.consolidation_tick() { |
|
|
ConsolidationTickResult::Continue(_) => continue, |
|
|
ConsolidationTickResult::Complete(metrics) => return metrics, |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
fn is_consolidating(&self) -> bool; |
|
|
|
|
|
|
|
|
fn consolidation_progress(&self) -> Option<ConsolidationProgress>; |
|
|
|
|
|
|
|
|
fn cancel_consolidation(&mut self); |
|
|
} |
|
|
|
|
|
|
|
|
pub fn compute_exact_centroid(points: &[Point]) -> Option<Point> { |
|
|
if points.is_empty() { |
|
|
return None; |
|
|
} |
|
|
|
|
|
let dims = points[0].dimensionality(); |
|
|
let mut sum = vec![0.0f32; dims]; |
|
|
|
|
|
for point in points { |
|
|
for (i, &val) in point.dims().iter().enumerate() { |
|
|
sum[i] += val; |
|
|
} |
|
|
} |
|
|
|
|
|
let n = points.len() as f32; |
|
|
let mean: Vec<f32> = sum.iter().map(|s| s / n).collect(); |
|
|
|
|
|
Some(Point::new(mean).normalize()) |
|
|
} |
|
|
|
|
|
|
|
|
pub fn centroid_drift(old: &Point, new: &Point) -> f32 { |
|
|
old.dims() |
|
|
.iter() |
|
|
.zip(new.dims().iter()) |
|
|
.map(|(a, b)| (a - b).powi(2)) |
|
|
.sum::<f32>() |
|
|
.sqrt() |
|
|
} |
|
|
|
|
|
#[cfg(test)] |
|
|
mod tests { |
|
|
use super::*; |
|
|
|
|
|
#[test] |
|
|
fn test_consolidation_config_levels() { |
|
|
let light = ConsolidationConfig::light(); |
|
|
assert_eq!(light.level, ConsolidationLevel::Light); |
|
|
|
|
|
let medium = ConsolidationConfig::medium(); |
|
|
assert_eq!(medium.level, ConsolidationLevel::Medium); |
|
|
|
|
|
let deep = ConsolidationConfig::deep(); |
|
|
assert_eq!(deep.level, ConsolidationLevel::Deep); |
|
|
|
|
|
let full = ConsolidationConfig::full(); |
|
|
assert_eq!(full.level, ConsolidationLevel::Full); |
|
|
} |
|
|
|
|
|
#[test] |
|
|
fn test_consolidation_state_phases() { |
|
|
let config = ConsolidationConfig::light(); |
|
|
let mut state = ConsolidationState::new(config); |
|
|
|
|
|
assert_eq!(state.phase, ConsolidationPhase::Idle); |
|
|
|
|
|
state.start(); |
|
|
assert_eq!(state.phase, ConsolidationPhase::CollectingLeaves); |
|
|
|
|
|
state.next_phase(); |
|
|
assert_eq!(state.phase, ConsolidationPhase::RecomputingCentroids); |
|
|
|
|
|
|
|
|
state.next_phase(); |
|
|
assert_eq!(state.phase, ConsolidationPhase::Complete); |
|
|
assert!(state.is_complete()); |
|
|
} |
|
|
|
|
|
#[test] |
|
|
fn test_consolidation_state_medium_phases() { |
|
|
let config = ConsolidationConfig::medium(); |
|
|
let mut state = ConsolidationState::new(config); |
|
|
|
|
|
state.start(); |
|
|
assert_eq!(state.phase, ConsolidationPhase::CollectingLeaves); |
|
|
|
|
|
state.next_phase(); |
|
|
assert_eq!(state.phase, ConsolidationPhase::RecomputingCentroids); |
|
|
|
|
|
state.next_phase(); |
|
|
assert_eq!(state.phase, ConsolidationPhase::AnalyzingStructure); |
|
|
|
|
|
state.next_phase(); |
|
|
assert_eq!(state.phase, ConsolidationPhase::Merging); |
|
|
|
|
|
state.next_phase(); |
|
|
assert_eq!(state.phase, ConsolidationPhase::Splitting); |
|
|
|
|
|
|
|
|
state.next_phase(); |
|
|
assert_eq!(state.phase, ConsolidationPhase::Complete); |
|
|
} |
|
|
|
|
|
#[test] |
|
|
fn test_centroid_computation() { |
|
|
let points = vec![ |
|
|
Point::new(vec![1.0, 0.0, 0.0]), |
|
|
Point::new(vec![0.0, 1.0, 0.0]), |
|
|
Point::new(vec![0.0, 0.0, 1.0]), |
|
|
]; |
|
|
|
|
|
let centroid = compute_exact_centroid(&points).unwrap(); |
|
|
|
|
|
|
|
|
let expected_unnorm = (1.0f32 / 3.0).sqrt(); |
|
|
for dim in centroid.dims() { |
|
|
assert!((dim - expected_unnorm).abs() < 0.01); |
|
|
} |
|
|
} |
|
|
|
|
|
#[test] |
|
|
fn test_centroid_drift() { |
|
|
let old = Point::new(vec![1.0, 0.0, 0.0]); |
|
|
let new = Point::new(vec![0.9, 0.1, 0.0]).normalize(); |
|
|
|
|
|
let drift = centroid_drift(&old, &new); |
|
|
assert!(drift > 0.0); |
|
|
assert!(drift < 1.0); |
|
|
} |
|
|
|
|
|
#[test] |
|
|
fn test_drift_recording() { |
|
|
let config = ConsolidationConfig::default(); |
|
|
let mut state = ConsolidationState::new(config); |
|
|
|
|
|
state.record_drift(0.05); |
|
|
state.record_drift(0.10); |
|
|
state.record_drift(0.02); |
|
|
|
|
|
assert_eq!(state.metrics.max_centroid_drift, 0.10); |
|
|
assert_eq!(state.centroid_drifts.len(), 3); |
|
|
} |
|
|
} |
|
|
|