| |
| |
| |
| |
| |
| |
|
|
| use pyo3::prelude::*; |
| use std::collections::HashMap; |
|
|
| |
| #[derive(Clone, Debug)] |
| pub struct AccessEvent { |
| pub timestamp_ns: u64, |
| pub path: String, |
| pub size_bytes: u64, |
| } |
|
|
| |
| #[derive(Clone, Debug)] |
| pub struct CausalEdge { |
| pub source_id: u32, |
| pub target_id: u32, |
| pub count: u32, |
| pub mean_delta_ns: f64, |
| pub std_delta_ns: f64, |
| pub weight: f64, |
| } |
|
|
| impl CausalEdge { |
| fn new(source_id: u32, target_id: u32) -> Self { |
| Self { |
| source_id, |
| target_id, |
| count: 0, |
| mean_delta_ns: 0.0, |
| std_delta_ns: 0.0, |
| weight: 0.0, |
| } |
| } |
|
|
| |
| fn add_observation(&mut self, delta_ns: f64) { |
| self.count += 1; |
| let n = self.count as f64; |
| let old_mean = self.mean_delta_ns; |
| self.mean_delta_ns += (delta_ns - old_mean) / n; |
| |
| self.std_delta_ns += (delta_ns - old_mean) * (delta_ns - self.mean_delta_ns); |
| } |
|
|
| |
| fn finalize(&mut self) { |
| if self.count > 1 { |
| self.std_delta_ns = (self.std_delta_ns / (self.count as f64 - 1.0)).sqrt(); |
| } else { |
| self.std_delta_ns = 0.0; |
| } |
| |
| |
| let consistency = 1.0 / (1.0 + self.std_delta_ns / self.mean_delta_ns.max(1.0)); |
| self.weight = self.count as f64 * consistency; |
| } |
| } |
|
|
| |
| #[derive(Clone, Debug)] |
| pub struct Cluster { |
| pub id: u32, |
| pub member_ids: Vec<u32>, |
| } |
|
|
| |
| #[derive(Clone, Debug)] |
| pub struct NodeInfo { |
| pub id: u32, |
| pub path: String, |
| pub access_count: u32, |
| pub total_bytes: u64, |
| pub first_access_ns: u64, |
| pub last_access_ns: u64, |
| } |
|
|
| |
| |
| |
| #[pyclass] |
| pub struct AccessGraph { |
| |
| path_to_id: HashMap<String, u32>, |
| |
| nodes: Vec<NodeInfo>, |
| |
| edges: HashMap<(u32, u32), CausalEdge>, |
| |
| pub clusters: Vec<Cluster>, |
| |
| causal_window_ns: u64, |
| |
| cluster_threshold: f64, |
| |
| built: bool, |
| |
| successors: Vec<Vec<(u32, f64, f64)>>, |
| |
| cluster_map: Vec<Option<u32>>, |
| } |
|
|
| #[pymethods] |
| impl AccessGraph { |
| #[new] |
| #[pyo3(signature = (causal_window_ns=5_000_000, cluster_threshold=0.7))] |
| pub fn new(causal_window_ns: u64, cluster_threshold: f64) -> Self { |
| Self { |
| path_to_id: HashMap::new(), |
| nodes: Vec::new(), |
| edges: HashMap::new(), |
| clusters: Vec::new(), |
| causal_window_ns, |
| cluster_threshold, |
| built: false, |
| successors: Vec::new(), |
| cluster_map: Vec::new(), |
| } |
| } |
|
|
| |
| |
| |
| pub fn build(&mut self, events: Vec<(u64, String, u64)>) { |
| if events.is_empty() { |
| return; |
| } |
|
|
| |
| for (ts, path, size) in &events { |
| let id = self.get_or_create_node(path); |
| let node = &mut self.nodes[id as usize]; |
| node.access_count += 1; |
| node.total_bytes += size; |
| if *ts < node.first_access_ns { |
| node.first_access_ns = *ts; |
| } |
| if *ts > node.last_access_ns { |
| node.last_access_ns = *ts; |
| } |
| } |
|
|
| |
| let n = events.len(); |
| for i in 0..n { |
| let (ts_i, ref path_i, _) = events[i]; |
| let id_i = self.path_to_id[path_i]; |
|
|
| for j in (i + 1)..n { |
| let (ts_j, ref path_j, _) = events[j]; |
| let delta = ts_j - ts_i; |
|
|
| if delta > self.causal_window_ns { |
| break; |
| } |
|
|
| let id_j = self.path_to_id[path_j]; |
| if id_i == id_j { |
| continue; |
| } |
|
|
| let edge = self.edges |
| .entry((id_i, id_j)) |
| .or_insert_with(|| CausalEdge::new(id_i, id_j)); |
| edge.add_observation(delta as f64); |
| } |
| } |
|
|
| |
| for edge in self.edges.values_mut() { |
| edge.finalize(); |
| } |
|
|
| |
| self.discover_clusters(); |
|
|
| |
| self.build_successors(); |
|
|
| self.built = true; |
| } |
|
|
| |
| pub fn node_count(&self) -> usize { |
| self.nodes.len() |
| } |
|
|
| |
| pub fn edge_count(&self) -> usize { |
| self.edges.len() |
| } |
|
|
| |
| fn strong_edge_count(&self, min_weight: f64) -> usize { |
| self.edges.values().filter(|e| e.weight >= min_weight).count() |
| } |
|
|
| |
| pub fn cluster_count(&self) -> usize { |
| self.clusters.len() |
| } |
|
|
| |
| pub fn get_node_stats(&self) -> Vec<(String, u32)> { |
| self.nodes.iter() |
| .map(|n| (n.path.clone(), n.access_count)) |
| .collect() |
| } |
|
|
| |
| fn get_top_edges(&self, limit: usize) -> Vec<(String, String, u32, f64, f64)> { |
| let mut edges: Vec<_> = self.edges.values().collect(); |
| edges.sort_by(|a, b| b.weight.partial_cmp(&a.weight).unwrap()); |
| edges.iter() |
| .take(limit) |
| .map(|e| { |
| let src = &self.nodes[e.source_id as usize].path; |
| let tgt = &self.nodes[e.target_id as usize].path; |
| (src.clone(), tgt.clone(), e.count, e.mean_delta_ns / 1_000_000.0, e.weight) |
| }) |
| .collect() |
| } |
|
|
| |
| fn is_built(&self) -> bool { |
| self.built |
| } |
| } |
|
|
| |
| impl AccessGraph { |
| fn get_or_create_node(&mut self, path: &str) -> u32 { |
| if let Some(&id) = self.path_to_id.get(path) { |
| return id; |
| } |
| let id = self.nodes.len() as u32; |
| self.path_to_id.insert(path.to_string(), id); |
| self.nodes.push(NodeInfo { |
| id, |
| path: path.to_string(), |
| access_count: 0, |
| total_bytes: 0, |
| first_access_ns: u64::MAX, |
| last_access_ns: 0, |
| }); |
| id |
| } |
|
|
| fn discover_clusters(&mut self) { |
| let n = self.nodes.len(); |
| if n < 2 { |
| return; |
| } |
|
|
| |
| let mut cocount: HashMap<(u32, u32), u32> = HashMap::new(); |
| for ((src, tgt), edge) in &self.edges { |
| *cocount.entry((*src, *tgt)).or_default() += edge.count; |
| *cocount.entry((*tgt, *src)).or_default() += edge.count; |
| } |
|
|
| |
| let mut adjacency: Vec<Vec<u32>> = vec![Vec::new(); n]; |
| for i in 0..n { |
| for j in (i + 1)..n { |
| let co = cocount.get(&(i as u32, j as u32)).copied().unwrap_or(0); |
| let min_count = self.nodes[i].access_count |
| .min(self.nodes[j].access_count) |
| .max(1); |
| let ratio = co as f64 / min_count as f64; |
| if ratio >= self.cluster_threshold { |
| adjacency[i].push(j as u32); |
| adjacency[j].push(i as u32); |
| } |
| } |
| } |
|
|
| |
| let mut visited = vec![false; n]; |
| let mut cluster_id: u32 = 0; |
|
|
| |
| self.cluster_map = vec![None; n]; |
|
|
| for start in 0..n { |
| if visited[start] || adjacency[start].is_empty() { |
| continue; |
| } |
|
|
| let mut component = Vec::new(); |
| let mut queue = vec![start]; |
|
|
| while let Some(node) = queue.pop() { |
| if visited[node] { |
| continue; |
| } |
| visited[node] = true; |
| component.push(node as u32); |
|
|
| for &neighbor in &adjacency[node] { |
| if !visited[neighbor as usize] { |
| queue.push(neighbor as usize); |
| } |
| } |
| } |
|
|
| if component.len() >= 2 { |
| for &member_id in &component { |
| self.cluster_map[member_id as usize] = Some(cluster_id); |
| } |
| self.clusters.push(Cluster { |
| id: cluster_id, |
| member_ids: component, |
| }); |
| cluster_id += 1; |
| } |
| } |
| } |
|
|
| fn build_successors(&mut self) { |
| let n = self.nodes.len(); |
| let max_weight = self.edges.values() |
| .map(|e| e.weight) |
| .fold(0.0f64, f64::max) |
| .max(1.0); |
|
|
| self.successors = vec![Vec::new(); n]; |
|
|
| for edge in self.edges.values() { |
| if edge.weight < 1.0 { |
| continue; |
| } |
| let norm_weight = edge.weight / max_weight; |
| self.successors[edge.source_id as usize].push(( |
| edge.target_id, |
| norm_weight, |
| edge.mean_delta_ns / 1_000_000.0, |
| )); |
| } |
|
|
| |
| for succs in &mut self.successors { |
| succs.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap()); |
| succs.truncate(10); |
| } |
| } |
|
|
| |
| pub fn get_successors(&self, path: &str) -> &[(u32, f64, f64)] { |
| if let Some(&id) = self.path_to_id.get(path) { |
| &self.successors[id as usize] |
| } else { |
| &[] |
| } |
| } |
|
|
| |
| pub fn get_cluster_members(&self, path: &str) -> Option<&[u32]> { |
| let &id = self.path_to_id.get(path)?; |
| let cluster_id = self.cluster_map[id as usize]?; |
| Some(&self.clusters[cluster_id as usize].member_ids) |
| } |
|
|
| |
| pub fn get_path(&self, id: u32) -> Option<&str> { |
| self.nodes.get(id as usize).map(|n| n.path.as_str()) |
| } |
|
|
| |
| pub fn get_id(&self, path: &str) -> Option<u32> { |
| self.path_to_id.get(path).copied() |
| } |
| } |
|
|
| #[cfg(test)] |
| mod tests { |
| use super::*; |
|
|
| #[test] |
| fn test_build_simple_graph() { |
| let mut graph = AccessGraph::new(5_000_000, 0.7); |
|
|
| |
| let mut events = Vec::new(); |
| for i in 0..10 { |
| let base = i * 3_000_000; |
| events.push((base as u64, "A".to_string(), 100)); |
| events.push((base as u64 + 1_000_000, "B".to_string(), 100)); |
| events.push((base as u64 + 2_000_000, "C".to_string(), 100)); |
| } |
|
|
| graph.build(events); |
|
|
| assert_eq!(graph.node_count(), 3); |
| assert!(graph.edge_count() > 0); |
| assert!(graph.is_built()); |
|
|
| |
| let top = graph.get_top_edges(5); |
| assert!(!top.is_empty()); |
| println!("Top edges: {:?}", top); |
| } |
|
|
| #[test] |
| fn test_cluster_discovery() { |
| let mut graph = AccessGraph::new(3_000_000, 0.6); |
|
|
| |
| |
| |
| let mut events = Vec::new(); |
| for i in 0..30 { |
| let base = i * 20_000_000; |
| |
| events.push((base as u64, "X".to_string(), 100)); |
| events.push((base as u64 + 100_000, "Y".to_string(), 100)); |
| events.push((base as u64 + 200_000, "Z".to_string(), 100)); |
| |
| |
| events.push((base as u64 + 10_000_000, "P".to_string(), 100)); |
| events.push((base as u64 + 10_100_000, "Q".to_string(), 100)); |
| } |
|
|
| graph.build(events); |
|
|
| assert!(graph.cluster_count() >= 2, "Should find at least 2 clusters, found {}", graph.cluster_count()); |
| } |
|
|
| #[test] |
| fn test_successor_lookup() { |
| let mut graph = AccessGraph::new(5_000_000, 0.7); |
|
|
| let mut events = Vec::new(); |
| for i in 0..50 { |
| let base = i * 5_000_000; |
| events.push((base as u64, "src".to_string(), 100)); |
| events.push((base as u64 + 1_000_000, "dst".to_string(), 100)); |
| } |
|
|
| graph.build(events); |
|
|
| let succs = graph.get_successors("src"); |
| assert!(!succs.is_empty(), "src should have successors"); |
| assert_eq!(graph.get_path(succs[0].0), Some("dst")); |
| } |
| } |
|
|