testing_space / app.py
everydaytok's picture
Update app.py
4d4378a verified
import numpy as np
import matplotlib.pyplot as plt
import io, base64
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from sklearn.decomposition import PCA
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics.pairwise import euclidean_distances
import time
app = FastAPI()
class AdaptiveVectorSystem:
def _calculate_score(self, target, constituents):
"""
Generates a score (-1.0 to 10.0) representing convergence 'effort'.
"""
if len(constituents) == 0:
return -1.0
# Calculate distances from the calculated center to the points used
dists = np.linalg.norm(constituents - target, axis=1)
# Mean distance (how far usually)
mean_dist = np.mean(dists)
# Standard Deviation (how chaotic/scattered)
std_dev = np.std(dists)
# Heuristic: We want a high score for Low Mean and Low Std Dev.
# We normalize based on the mean_dist itself to make it scale-invariant.
# If std_dev is high relative to mean_dist, score drops.
variation_coefficient = (std_dev / (mean_dist + 1e-9))
# Base score starts at 10
# We penalize for high variation (chaos) and raw distance
penalty = (variation_coefficient * 6.0) + (mean_dist * 0.1)
score = 10.0 - penalty
return max(-1.0, min(10.0, score))
def predict_point(self, vectors, mode='global'):
data = np.array(vectors)
# --- GLOBAL MODE ---
# "Force a fit for everyone."
# Great for converged data, terrible for split data.
if mode == 'global':
center = np.mean(data, axis=0)
score = self._calculate_score(center, data)
return center, score, data
# --- CLUSTER MODE ---
# "Find the strongest gravity well."
elif mode == 'cluster':
# 1. Compute Pairwise Distances to understand the "Scale" of the data
dist_matrix = euclidean_distances(data, data)
# Flatten matrix and remove zeros (self-distance) to get average spacing
all_dists = dist_matrix[np.triu_indices(len(data), k=1)]
avg_global_dist = np.mean(all_dists)
# 2. DYNAMIC THRESHOLDING
# We say: To belong to a group, points must be significantly closer
# than the global average. (e.g., 0.6 * average)
dynamic_thresh = avg_global_dist * 0.65
# 3. Cluster with this dynamic threshold
clusterer = AgglomerativeClustering(
n_clusters=None,
metric='euclidean',
linkage='ward',
distance_threshold=dynamic_thresh
)
labels = clusterer.fit_predict(data)
# 4. Find the "Best" Cluster
# We look for the Largest cluster, but we ignore "Noise" (clusters of size 1 or 2)
unique_labels, counts = np.unique(labels, return_counts=True)
# Filter out tiny clusters (noise)
valid_clusters = [l for l, c in zip(unique_labels, counts) if c > 2]
if not valid_clusters:
# Fallback if everything is noise: treat everything as one group
return self.predict_point(data, mode='global')
# Pick largest of the valid clusters
# (You could also pick the 'densest' here, but largest is usually safest)
best_label = max(valid_clusters, key=lambda l: counts[np.where(unique_labels == l)][0])
# 5. Extract Data
cluster_vectors = data[labels == best_label]
center = np.mean(cluster_vectors, axis=0)
score = self._calculate_score(center, cluster_vectors)
return center, score, cluster_vectors
# --- VISUALIZATION LOGIC ---
def generate_plot(mode='global', scenario='split'):
# Generate 128-dimension vectors
np.random.seed(int(time.time())
) # Consistent seed for demo
if scenario == 'split':
# Create two dense islands far apart
# Island 1: centered at 0
c1 = np.random.normal(0, 0.5, (100, 128))
# Island 2: centered at 10 (In 128D, distance approx sqrt(128*100) = ~113 units away)
c2 = np.random.normal(8, 0.5, (100, 128))
# Noise: Random scatter
noise = np.random.uniform(-5, 15, (10, 128))
data = np.vstack([c1, c2, noise])
else:
# One Tight Cluster
data = np.random.normal(0, 1.0, (50, 128))
# Run System
sys = AdaptiveVectorSystem()
center_vec, score, used_vectors = sys.predict_point(data, mode)
# PCA for 2D View
# Important: Fit PCA on Input + Center so they share the same coordinate space
pca = PCA(n_components=2)
all_points = np.vstack([data, center_vec])
projected = pca.fit_transform(all_points)
pts_2d = projected[:-1]
center_2d = projected[-1]
# --- Plotting ---
plt.figure(figsize=(7, 5), facecolor='#202020')
ax = plt.gca()
ax.set_facecolor('#303030')
# Logic to identify which points were used (for coloring)
# We compare the rows of 'used_vectors' to 'data' to find indices
# Note: In production, pass indices around. For demo, we do a quick check.
is_used = np.zeros(len(data), dtype=bool)
# A quick way to mask used vectors using broadcasting approximation
# (Since floats are tricky, we assume exact match from the split)
if mode == 'global':
is_used[:] = True
else:
# Brute force match for visualization accuracy
for uv in used_vectors:
for i, dv in enumerate(data):
if np.array_equal(uv, dv):
is_used[i] = True
break
# 1. Plot IGNORED points (Grey, transparent)
if not np.all(is_used):
plt.scatter(pts_2d[~is_used, 0], pts_2d[~is_used, 1],
c='#555555', alpha=0.3, s=30, label='Ignored (Noise/Other)')
# 2. Plot USED points (Bright Cyan)
plt.scatter(pts_2d[is_used, 0], pts_2d[is_used, 1],
c='#00e5ff', alpha=0.8, s=40, edgecolors='none', label='Constituent Inputs')
# 3. Draw "Gravity Lines" (faint lines from used points to center)
# Only draw lines if there aren't too many points, to keep it clean
if np.sum(is_used) < 100:
for pt in pts_2d[is_used]:
plt.plot([pt[0], center_2d[0]], [pt[1], center_2d[1]],
c='#00e5ff', alpha=0.15, linewidth=1)
# 4. Plot The PREDICTED POINT (Red X)
plt.scatter(center_2d[0], center_2d[1],
c='#ff3366', s=200, marker='X', edgecolors='white', linewidth=1.5,
label='Generated Vector', zorder=10)
# Styling
plt.title(f"Mode: {mode.upper()} | Score: {score:.2f}/10", color='white', fontsize=12, pad=10)
plt.grid(True, color='#444444', linestyle='--', alpha=0.5)
# Legend formatting
leg = plt.legend(facecolor='#303030', edgecolor='#555555', fontsize=8, loc='best')
for text in leg.get_texts():
text.set_color("white")
# Axis colors
ax.tick_params(axis='x', colors='white')
ax.tick_params(axis='y', colors='white')
for spine in ax.spines.values():
spine.set_edgecolor('#555555')
buf = io.BytesIO()
plt.savefig(buf, format='png', bbox_inches='tight')
plt.close()
return base64.b64encode(buf.getvalue()).decode('utf-8')
@app.get("/", response_class=HTMLResponse)
async def root():
img_global = generate_plot('global', 'split')
img_cluster = generate_plot('cluster', 'split')
img_tight = generate_plot('global', 'tight')
return f"""
<html>
<body style="font-family: 'Segoe UI', sans-serif; background:#121212; color:#e0e0e0; text-align:center; padding:20px;">
<h1 style="margin-bottom:10px;">Vector Convergence System</h1>
<p style="color:#888; margin-bottom:40px;">Dynamic Thresholding Algorithm</p>
<div style="display:flex; flex-wrap:wrap; justify-content:center; gap:20px;">
<!-- SCENARIO A -->
<div style="background:#1e1e1e; padding:20px; border-radius:12px; border:1px solid #333;">
<h2 style="color:#aaa; border-bottom:1px solid #333; padding-bottom:10px;">Scenario: Split Data</h2>
<div style="display:flex; gap:20px;">
<div>
<h3 style="color:#00e5ff;">Global Mode</h3>
<div style="font-size:0.8em; color:#888; margin-bottom:5px;">Averages everything (Score -1 to 2)</div>
<img src="data:image/png;base64,{img_global}" width="400" style="border-radius:8px;"/>
</div>
<div>
<h3 style="color:#ff3366;">Cluster Mode (Revised)</h3>
<div style="font-size:0.8em; color:#888; margin-bottom:5px;">Identifies largest mass (Score 8 to 10)</div>
<img src="data:image/png;base64,{img_cluster}" width="400" style="border-radius:8px;"/>
</div>
</div>
</div>
<!-- SCENARIO B -->
<div style="background:#1e1e1e; padding:20px; border-radius:12px; border:1px solid #333;">
<h2 style="color:#aaa; border-bottom:1px solid #333; padding-bottom:10px;">Scenario: Converged Data</h2>
<div>
<h3 style="color:#00e5ff;">Global Mode</h3>
<div style="font-size:0.8em; color:#888; margin-bottom:5px;">Efficient calculation (Score ~10)</div>
<img src="data:image/png;base64,{img_tight}" width="400" style="border-radius:8px;"/>
</div>
</div>
</div>
</body>
</html>
"""