import numpy as np import matplotlib.pyplot as plt import io, base64 from fastapi import FastAPI from fastapi.responses import HTMLResponse from sklearn.decomposition import PCA from sklearn.cluster import AgglomerativeClustering from sklearn.metrics.pairwise import euclidean_distances import time app = FastAPI() class AdaptiveVectorSystem: def _calculate_score(self, target, constituents): """ Generates a score (-1.0 to 10.0) representing convergence 'effort'. """ if len(constituents) == 0: return -1.0 # Calculate distances from the calculated center to the points used dists = np.linalg.norm(constituents - target, axis=1) # Mean distance (how far usually) mean_dist = np.mean(dists) # Standard Deviation (how chaotic/scattered) std_dev = np.std(dists) # Heuristic: We want a high score for Low Mean and Low Std Dev. # We normalize based on the mean_dist itself to make it scale-invariant. # If std_dev is high relative to mean_dist, score drops. variation_coefficient = (std_dev / (mean_dist + 1e-9)) # Base score starts at 10 # We penalize for high variation (chaos) and raw distance penalty = (variation_coefficient * 6.0) + (mean_dist * 0.1) score = 10.0 - penalty return max(-1.0, min(10.0, score)) def predict_point(self, vectors, mode='global'): data = np.array(vectors) # --- GLOBAL MODE --- # "Force a fit for everyone." # Great for converged data, terrible for split data. if mode == 'global': center = np.mean(data, axis=0) score = self._calculate_score(center, data) return center, score, data # --- CLUSTER MODE --- # "Find the strongest gravity well." elif mode == 'cluster': # 1. Compute Pairwise Distances to understand the "Scale" of the data dist_matrix = euclidean_distances(data, data) # Flatten matrix and remove zeros (self-distance) to get average spacing all_dists = dist_matrix[np.triu_indices(len(data), k=1)] avg_global_dist = np.mean(all_dists) # 2. DYNAMIC THRESHOLDING # We say: To belong to a group, points must be significantly closer # than the global average. (e.g., 0.6 * average) dynamic_thresh = avg_global_dist * 0.65 # 3. Cluster with this dynamic threshold clusterer = AgglomerativeClustering( n_clusters=None, metric='euclidean', linkage='ward', distance_threshold=dynamic_thresh ) labels = clusterer.fit_predict(data) # 4. Find the "Best" Cluster # We look for the Largest cluster, but we ignore "Noise" (clusters of size 1 or 2) unique_labels, counts = np.unique(labels, return_counts=True) # Filter out tiny clusters (noise) valid_clusters = [l for l, c in zip(unique_labels, counts) if c > 2] if not valid_clusters: # Fallback if everything is noise: treat everything as one group return self.predict_point(data, mode='global') # Pick largest of the valid clusters # (You could also pick the 'densest' here, but largest is usually safest) best_label = max(valid_clusters, key=lambda l: counts[np.where(unique_labels == l)][0]) # 5. Extract Data cluster_vectors = data[labels == best_label] center = np.mean(cluster_vectors, axis=0) score = self._calculate_score(center, cluster_vectors) return center, score, cluster_vectors # --- VISUALIZATION LOGIC --- def generate_plot(mode='global', scenario='split'): # Generate 128-dimension vectors np.random.seed(int(time.time()) ) # Consistent seed for demo if scenario == 'split': # Create two dense islands far apart # Island 1: centered at 0 c1 = np.random.normal(0, 0.5, (100, 128)) # Island 2: centered at 10 (In 128D, distance approx sqrt(128*100) = ~113 units away) c2 = np.random.normal(8, 0.5, (100, 128)) # Noise: Random scatter noise = np.random.uniform(-5, 15, (10, 128)) data = np.vstack([c1, c2, noise]) else: # One Tight Cluster data = np.random.normal(0, 1.0, (50, 128)) # Run System sys = AdaptiveVectorSystem() center_vec, score, used_vectors = sys.predict_point(data, mode) # PCA for 2D View # Important: Fit PCA on Input + Center so they share the same coordinate space pca = PCA(n_components=2) all_points = np.vstack([data, center_vec]) projected = pca.fit_transform(all_points) pts_2d = projected[:-1] center_2d = projected[-1] # --- Plotting --- plt.figure(figsize=(7, 5), facecolor='#202020') ax = plt.gca() ax.set_facecolor('#303030') # Logic to identify which points were used (for coloring) # We compare the rows of 'used_vectors' to 'data' to find indices # Note: In production, pass indices around. For demo, we do a quick check. is_used = np.zeros(len(data), dtype=bool) # A quick way to mask used vectors using broadcasting approximation # (Since floats are tricky, we assume exact match from the split) if mode == 'global': is_used[:] = True else: # Brute force match for visualization accuracy for uv in used_vectors: for i, dv in enumerate(data): if np.array_equal(uv, dv): is_used[i] = True break # 1. Plot IGNORED points (Grey, transparent) if not np.all(is_used): plt.scatter(pts_2d[~is_used, 0], pts_2d[~is_used, 1], c='#555555', alpha=0.3, s=30, label='Ignored (Noise/Other)') # 2. Plot USED points (Bright Cyan) plt.scatter(pts_2d[is_used, 0], pts_2d[is_used, 1], c='#00e5ff', alpha=0.8, s=40, edgecolors='none', label='Constituent Inputs') # 3. Draw "Gravity Lines" (faint lines from used points to center) # Only draw lines if there aren't too many points, to keep it clean if np.sum(is_used) < 100: for pt in pts_2d[is_used]: plt.plot([pt[0], center_2d[0]], [pt[1], center_2d[1]], c='#00e5ff', alpha=0.15, linewidth=1) # 4. Plot The PREDICTED POINT (Red X) plt.scatter(center_2d[0], center_2d[1], c='#ff3366', s=200, marker='X', edgecolors='white', linewidth=1.5, label='Generated Vector', zorder=10) # Styling plt.title(f"Mode: {mode.upper()} | Score: {score:.2f}/10", color='white', fontsize=12, pad=10) plt.grid(True, color='#444444', linestyle='--', alpha=0.5) # Legend formatting leg = plt.legend(facecolor='#303030', edgecolor='#555555', fontsize=8, loc='best') for text in leg.get_texts(): text.set_color("white") # Axis colors ax.tick_params(axis='x', colors='white') ax.tick_params(axis='y', colors='white') for spine in ax.spines.values(): spine.set_edgecolor('#555555') buf = io.BytesIO() plt.savefig(buf, format='png', bbox_inches='tight') plt.close() return base64.b64encode(buf.getvalue()).decode('utf-8') @app.get("/", response_class=HTMLResponse) async def root(): img_global = generate_plot('global', 'split') img_cluster = generate_plot('cluster', 'split') img_tight = generate_plot('global', 'tight') return f"""
Dynamic Thresholding Algorithm