| | import os |
| | import io |
| | import json |
| | import math |
| | import tempfile |
| | from pathlib import Path |
| | from typing import List, Tuple, Dict |
| |
|
| | import numpy as np |
| | import pandas as pd |
| |
|
| | import matplotlib |
| | matplotlib.use("Agg") |
| | import matplotlib.pyplot as plt |
| |
|
| | import gradio as gr |
| |
|
| | |
| | from docx import Document |
| | import traceback |
| |
|
| | |
| | |
| | from sklearn.feature_extraction.text import HashingVectorizer, TfidfVectorizer |
| | from sklearn.decomposition import PCA |
| |
|
| | |
| | _ST_MODEL = None |
| | def _load_st_model(): |
| | global _ST_MODEL |
| | if _ST_MODEL is not None: |
| | return _ST_MODEL |
| | try: |
| | from sentence_transformers import SentenceTransformer |
| | |
| | _ST_MODEL = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") |
| | return _ST_MODEL |
| | except Exception as e: |
| | return None |
| |
|
| | def _resolve_file_input(file_obj): |
| | """Return (bytes_io, display_name) for a variety of Gradio/HF file input shapes. |
| | Supports: tempfile objects, dicts with 'name'/'path'/'data', raw path strings, or bytes. |
| | """ |
| | import io, os |
| | |
| | if isinstance(file_obj, dict): |
| | |
| | for key in ("path", "name"): |
| | p = file_obj.get(key) |
| | if isinstance(p, str) and os.path.exists(p): |
| | with open(p, "rb") as f: |
| | return io.BytesIO(f.read()), os.path.basename(p) |
| | |
| | data = file_obj.get("data") |
| | if isinstance(data, (bytes, bytearray)): |
| | return io.BytesIO(bytes(data)), file_obj.get("orig_name", "upload.docx") |
| | |
| | if hasattr(file_obj, "read") and hasattr(file_obj, "name"): |
| | try: |
| | file_obj.seek(0) |
| | content = file_obj.read() |
| | if isinstance(content, (bytes, bytearray)): |
| | return io.BytesIO(content), os.path.basename(getattr(file_obj, "name", "upload.docx")) |
| | except Exception: |
| | pass |
| | |
| | p = getattr(file_obj, "name", None) |
| | if isinstance(p, str) and os.path.exists(p): |
| | with open(p, "rb") as f: |
| | return io.BytesIO(f.read()), os.path.basename(p) |
| | |
| | if isinstance(file_obj, str) and os.path.exists(file_obj): |
| | with open(file_obj, "rb") as f: |
| | import os |
| | return io.BytesIO(f.read()), os.path.basename(file_obj) |
| | |
| | if isinstance(file_obj, (bytes, bytearray)): |
| | return io.BytesIO(bytes(file_obj)), "upload.docx" |
| | |
| | return None, "upload.docx" |
| |
|
| | def read_docx_any(file_obj) -> List[str]: |
| | bio, _ = _resolve_file_input(file_obj) |
| | if bio is None: |
| | raise ValueError("Could not read uploaded .docx file; unsupported input shape.") |
| | doc = Document(bio) |
| | paras = [p.text.strip() for p in doc.paragraphs] |
| | paras = [p for p in paras if p and not p.isspace()] |
| | return paras |
| |
|
| | def _basic_sentence_split(text: str) -> List[str]: |
| | |
| | |
| | import re |
| | rough = re.split(r'[\n\r]+|(?<=[\.\!\?])\s+', text.strip()) |
| | out = [] |
| | for s in rough: |
| | s = s.strip() |
| | if len(s) > 0: |
| | out.append(s) |
| | return out |
| |
|
| | def paragraphs_to_units(paras: List[str], mode: str = "paragraphs") -> List[str]: |
| | if mode == "paragraphs": |
| | return paras |
| | elif mode == "sentences": |
| | units = [] |
| | for p in paras: |
| | units.extend(_basic_sentence_split(p)) |
| | return units |
| | else: |
| | return paras |
| |
|
| | def embed_texts(texts: List[str], prefer_sentence_transformer: bool = True) -> Tuple[np.ndarray, str]: |
| | """ |
| | Returns L2-normalized embeddings [N, d] and a string describing the backend. |
| | Tries SentenceTransformer; if not available, falls back to HashingVectorizer. |
| | """ |
| | texts = [t if isinstance(t, str) else str(t) for t in texts] |
| | if prefer_sentence_transformer: |
| | model = _load_st_model() |
| | if model is not None: |
| | try: |
| | vecs = model.encode(texts, batch_size=32, show_progress_bar=False, convert_to_numpy=True, normalize_embeddings=True) |
| | return vecs.astype(np.float32), "sentence-transformers/all-MiniLM-L6-v2" |
| | except Exception as e: |
| | pass |
| |
|
| | |
| | hv = HashingVectorizer(n_features=768, alternate_sign=False, norm=None) |
| | X = hv.transform(texts) |
| | |
| | vecs = X.toarray().astype(np.float32) |
| | |
| | norms = np.linalg.norm(vecs, axis=1, keepdims=True) + 1e-9 |
| | vecs = vecs / norms |
| | return vecs, "HashingVectorizer(768d) fallback" |
| |
|
| | |
| |
|
| | def softmax(x, axis=-1): |
| | x = x - np.max(x, axis=axis, keepdims=True) |
| | ex = np.exp(x) |
| | return ex / (np.sum(ex, axis=axis, keepdims=True) + 1e-9) |
| |
|
| | def global_range_entropy(p: np.ndarray) -> float: |
| | """ |
| | p: [N, K] soft assignments. |
| | m_j = mean_i p_ij |
| | H_g = - sum_j m_j log m_j |
| | """ |
| | m = p.mean(axis=0) |
| | m_safe = np.clip(m, 1e-12, None) |
| | return float(-(m_safe * np.log(m_safe)).sum()) |
| |
|
| | def soft_slab_entropy(z: np.ndarray, U: np.ndarray, bins: int = 8, tau: float = 5.0) -> float: |
| | """ |
| | z: [N, d] normalized embeddings |
| | U: [K, d] anchor directions (assumed normalized) |
| | Returns average entropy across anchors of a soft histogram over projected coordinates. |
| | """ |
| | |
| | t = z @ U.T |
| | K = U.shape[0] |
| | Hs = [] |
| | for j in range(K): |
| | tj = t[:, j] |
| | tmin, tmax = float(tj.min()), float(tj.max()) |
| | if not np.isfinite(tmin) or not np.isfinite(tmax) or tmax - tmin < 1e-6: |
| | Hs.append(0.0) |
| | continue |
| | centers = np.linspace(tmin, tmax, bins) |
| | |
| | |
| | dist2 = (tj[:, None] - centers[None, :]) ** 2 |
| | weights = softmax(-tau * dist2, axis=1) |
| | hist = weights.mean(axis=0) |
| | hist = np.clip(hist, 1e-12, None) |
| | H = float(-(hist * np.log(hist)).sum()) |
| | Hs.append(H) |
| | return float(np.mean(Hs)) if len(Hs) > 0 else 0.0 |
| |
|
| | def kmeans_plus_plus_init(z: np.ndarray, K: int, rng: np.random.RandomState) -> np.ndarray: |
| | |
| | |
| | N, d = z.shape |
| | inds = [] |
| | |
| | inds.append(rng.randint(0, N)) |
| | centers = [z[inds[0]]] |
| | |
| | cos0 = np.clip(z @ centers[0], -1.0, 1.0) |
| | d2 = 1.0 - cos0 |
| | d2 = np.clip(d2, 1e-12, None) |
| | for _ in range(1, K): |
| | s = d2.sum() |
| | if not np.isfinite(s) or s <= 0: |
| | probs = np.full(N, 1.0 / N) |
| | else: |
| | probs = d2 / s |
| | probs = np.clip(probs, 0.0, None) |
| | s2 = probs.sum() |
| | if s2 <= 0 or not np.isfinite(s2): |
| | probs = np.full(N, 1.0 / N) |
| | else: |
| | probs = probs / s2 |
| | next_idx = rng.choice(N, p=probs) |
| | inds.append(next_idx) |
| | centers.append(z[next_idx]) |
| | cos_new = np.clip(z @ z[next_idx], -1.0, 1.0) |
| | d2 = np.minimum(d2, 1.0 - cos_new) |
| | d2 = np.clip(d2, 1e-12, None) |
| | U = np.stack(centers, axis=0) |
| | U = U / (np.linalg.norm(U, axis=1, keepdims=True) + 1e-9) |
| | return U |
| |
|
| | def chr_optimize(z: np.ndarray, K: int = 8, iters: int = 30, beta: float = 12.0, bins: int = 8, tau: float = 5.0, seed: int = 42): |
| | """ |
| | Unsupervised CHR optimizer: |
| | - Initialize K anchor directions U via k-means++ on cosine distance. |
| | - Iterate: |
| | p_ij = softmax(beta * z_i · U_j) |
| | U_j = normalize( sum_i p_ij * z_i ) |
| | Returns final U, p, trajectories of global entropy and slab entropy. |
| | """ |
| | rng = np.random.RandomState(seed) |
| | N, d = z.shape |
| | U = kmeans_plus_plus_init(z, K, rng) if N >= K else np.pad(z, ((0, max(0, K - N)), (0, 0)), mode='wrap')[:K] |
| | |
| | U = U / (np.linalg.norm(U, axis=1, keepdims=True) + 1e-9) |
| |
|
| | |
| | logits0 = beta * (z @ U.T) |
| | p0 = softmax(logits0, axis=1) |
| | Hg0 = global_range_entropy(p0) |
| | Hs0 = soft_slab_entropy(z, U, bins=bins, tau=tau) |
| |
|
| | Hg_traj = [Hg0] |
| | Hs_traj = [Hs0] |
| |
|
| | for _ in range(iters): |
| | logits = beta * (z @ U.T) |
| | p = softmax(logits, axis=1) |
| | |
| | numer = p.T @ z |
| | |
| | denom = p.sum(axis=0)[:, None] + 1e-9 |
| | U = numer / denom |
| | |
| | U = U / (np.linalg.norm(U, axis=1, keepdims=True) + 1e-9) |
| |
|
| | Hg = global_range_entropy(p) |
| | Hs = soft_slab_entropy(z, U, bins=bins, tau=tau) |
| | Hg_traj.append(Hg) |
| | Hs_traj.append(Hs) |
| |
|
| | |
| | logits = beta * (z @ U.T) |
| | p = softmax(logits, axis=1) |
| | return U, p, np.array(Hg_traj), np.array(Hs_traj) |
| |
|
| | def compute_mhep(Hg_traj: np.ndarray, Hs_traj: np.ndarray, K: int, bins: int, w_g: float = 0.7, w_s: float = 0.3) -> float: |
| | """ |
| | Maximum Harvestable Energy Potential (MHEP) as a percentage. |
| | Normalizes entropy drops by theoretical maxima (log K for global, log bins for slab). |
| | """ |
| | if len(Hg_traj) < 2 or len(Hs_traj) < 2: |
| | return 0.0 |
| | maxHg = math.log(max(K, 2)) |
| | maxHs = math.log(max(bins, 2)) |
| |
|
| | drop_g = max(0.0, float(Hg_traj[0] - Hg_traj[-1])) / (maxHg + 1e-9) |
| | drop_s = max(0.0, float(Hs_traj[0] - Hs_traj[-1])) / (maxHs + 1e-9) |
| | score = 100.0 * (w_g * drop_g + w_s * drop_s) |
| | |
| | return float(np.clip(score, 0.0, 100.0)) |
| |
|
| | def structure_outputs(texts: List[str], z: np.ndarray, U: np.ndarray, p: np.ndarray) -> Tuple[pd.DataFrame, Dict[int, str]]: |
| | """ |
| | Create a structured table sorted by constellation and radial order, |
| | and summarize each constellation with top keywords. |
| | """ |
| | N, d = z.shape |
| | K = U.shape[0] |
| | |
| | labels = p.argmax(axis=1) |
| | |
| | proj = z @ U.T |
| | radial = proj[np.arange(N), labels] |
| |
|
| | df = pd.DataFrame({ |
| | "constellation": labels.astype(int), |
| | "radial_order": radial, |
| | "text": texts, |
| | "char_len": [len(t) for t in texts], |
| | "word_count": [len(t.split()) for t in texts], |
| | "confidence": p.max(axis=1) |
| | }) |
| | |
| | df = df.sort_values(by=["constellation", "radial_order"], ascending=[True, False]).reset_index(drop=True) |
| |
|
| | |
| | summaries = {} |
| | for j in range(K): |
| | cluster_texts = [texts[i] for i in range(N) if labels[i] == j] |
| | if len(cluster_texts) == 0: |
| | summaries[j] = "(empty)" |
| | continue |
| | |
| | corpus = [" ".join(cluster_texts), " ".join([texts[i] for i in range(N) if labels[i] != j])] |
| | try: |
| | tfidf = TfidfVectorizer(max_features=1000, ngram_range=(1,2), stop_words="english") |
| | X = tfidf.fit_transform(corpus) |
| | vocab = np.array(tfidf.get_feature_names_out()) |
| | |
| | scores = (X[0].toarray()[0] - X[1].toarray()[0]) |
| | idx = np.argsort(-scores)[:8] |
| | top_terms = [vocab[i] for i in idx if scores[i] > 0] |
| | summaries[j] = ", ".join(top_terms[:8]) if top_terms else "(generic)" |
| | except Exception as e: |
| | summaries[j] = "(summary unavailable)" |
| |
|
| | return df, summaries |
| |
|
| | def pca_plot(z: np.ndarray, U: np.ndarray, labels: np.ndarray, out_path: str): |
| | """ |
| | 2D PCA plot of points colored by constellation, with anchor stars. |
| | NOTE: We do not set any explicit colors or styles per instruction. |
| | """ |
| | if z.shape[1] > 2: |
| | pca = PCA(n_components=2, random_state=0) |
| | Z2 = pca.fit_transform(z) |
| | U2 = pca.transform(U) |
| | else: |
| | Z2 = z |
| | U2 = U |
| |
|
| | plt.figure(figsize=(6, 5)) |
| | |
| | plt.scatter(Z2[:, 0], Z2[:, 1], s=14, alpha=0.8, c=labels) |
| | |
| | plt.scatter(U2[:, 0], U2[:, 1], marker="*", s=180) |
| | plt.title("Constellation Map (PCA)") |
| | plt.xlabel("PC1") |
| | plt.ylabel("PC2") |
| | plt.tight_layout() |
| | plt.savefig(out_path, dpi=150) |
| | plt.close() |
| |
|
| | def process_pipeline(docx_file, units_mode, K, iters, beta, bins, tau, seed): |
| | if docx_file is None: |
| | return gr.update(value="# Please upload a .docx file."), None, None, None, None |
| |
|
| | |
| | paras = read_docx_any(docx_file) |
| | units = paragraphs_to_units(paras, mode=units_mode) |
| |
|
| | if len(units) == 0: |
| | return gr.update(value="# The document appears to be empty."), None, None, None, None |
| |
|
| | |
| | Z, backend = embed_texts(units, prefer_sentence_transformer=True) |
| |
|
| | |
| | U, p, Hg_traj, Hs_traj = chr_optimize(Z, K=int(K), iters=int(iters), beta=float(beta), bins=int(bins), tau=float(tau), seed=int(seed)) |
| | labels = p.argmax(axis=1) |
| |
|
| | |
| | Hg0, HgT = float(Hg_traj[0]), float(Hg_traj[-1]) |
| | Hs0, HsT = float(Hs_traj[0]), float(Hs_traj[-1]) |
| | mhep = compute_mhep(Hg_traj, Hs_traj, K=int(K), bins=int(bins)) |
| |
|
| | |
| | df, summaries = structure_outputs(units, Z, U, p) |
| |
|
| | |
| | tmpdir = tempfile.mkdtemp() |
| | csv_path = os.path.join(tmpdir, "constellations.csv") |
| | json_path = os.path.join(tmpdir, "constellations.json") |
| | plot_path = os.path.join(tmpdir, "constellations_pca.png") |
| |
|
| | df.to_csv(csv_path, index=False) |
| | with open(json_path, "w", encoding="utf-8") as f: |
| | json.dump(df.to_dict(orient="records"), f, ensure_ascii=False, indent=2) |
| |
|
| | |
| | pca_plot(Z, U, labels, plot_path) |
| |
|
| | |
| | md = [] |
| | md.append("# Constellation Harvest Regularization (CHR)") |
| | md.append("**Backend embeddings:** " + str(backend)) |
| | md.append("") |
| | md.append(f"**K (constellations):** {K} **Iterations:** {iters} **Beta:** {beta}") |
| | md.append(f"**Bins:** {bins} **Tau:** {tau}") |
| | md.append("") |
| | md.append("## Harvest Metrics") |
| | md.append(f"- Global range entropy (start → end): **{Hg0:.4f} → {HgT:.4f}**") |
| | md.append(f"- Slab entropy (start → end): **{Hs0:.4f} → {HsT:.4f}**") |
| | md.append(f"- **Maximum Harvestable Energy Potential (MHEP): {mhep:.1f}%**") |
| | md.append("") |
| | md.append("## Constellation Summaries") |
| | for j in range(int(K)): |
| | md.append(f"- **Constellation {j}**: {summaries.get(j, '(n/a)')}") |
| |
|
| | report_md = "\n".join(md) |
| |
|
| | return report_md, plot_path, df, csv_path, json_path |
| |
|
| |
|
| | |
| |
|
| | INTRO_MD = """ |
| | # Constellation Harvest Regularization (CHR) |
| | **Arrange your document into data constellations for maximum harvestable energy.** |
| | Upload a **.docx** file. We embed each unit (paragraphs or sentences), then **optimize a set of constellation directions** to **reduce range entropy** and **align slabs** (the CHR principle). |
| | You’ll get: |
| | - A **harvest score** (MHEP) showing how much structure we extracted. |
| | - A **constellation map** (2D PCA) with anchors (★) and your units as points. |
| | - A **structured table** grouped by constellation and ordered along each ray. |
| | - **CSV/JSON** exports for your pipeline. |
| | """ |
| |
|
| | HOW_MD = """ |
| | ## How it Works (Short Version) |
| | - We convert your document into units (**paragraphs** by default; you can switch to **sentences**). |
| | - We compute embeddings (MiniLM or a local fallback). |
| | - We initialize **K** anchor directions and iteratively adjust them to **lower the global range entropy** while forming **low-entropy slabs** along each anchor. |
| | - The **Maximum Harvestable Energy Potential (MHEP)** combines the normalized drop in global and slab entropy. |
| | - We then **group units by constellation** and **order them radially**, making the dataset easier to exploit downstream (routing, chunking, sparsity). |
| | |
| | **Tip:** Increase **K** for more granular constellations; increase **iterations** or **beta** for sharper structures. |
| | """ |
| |
|
| | with gr.Blocks(title="Constellation Harvest Regularization (CHR)") as demo: |
| | gr.Markdown(INTRO_MD) |
| |
|
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | docx_file = gr.File(label=".docx document", file_types=[".docx"], file_count="single") |
| | units_mode = gr.Radio(choices=["paragraphs", "sentences"], value="paragraphs", label="Unit granularity") |
| | K = gr.Slider(2, 24, value=8, step=1, label="K (number of constellations)") |
| | iters = gr.Slider(5, 100, value=30, step=1, label="Iterations") |
| | beta = gr.Slider(2, 30, value=12, step=1, label="Beta (softmax sharpness)") |
| | bins = gr.Slider(3, 16, value=8, step=1, label="Bins (slab histogram)") |
| | tau = gr.Slider(1, 20, value=5, step=1, label="Tau (slab softness)") |
| | seed = gr.Slider(0, 9999, value=42, step=1, label="Seed") |
| |
|
| | run_btn = gr.Button("Process", variant="primary") |
| |
|
| | with gr.Column(scale=1): |
| | report_md = gr.Markdown("# Upload a file to begin.") |
| | plot = gr.Image(label="Constellation Map (PCA)", type="filepath") |
| | gr.Markdown(HOW_MD) |
| |
|
| | df_out = gr.Dataframe(label="Structured Output (head)", wrap=True, interactive=False) |
| | with gr.Row(): |
| | csv_out = gr.File(label="Download CSV") |
| | json_out = gr.File(label="Download JSON") |
| |
|
| | |
| | run_btn.click(process_pipeline, |
| | inputs=[docx_file, units_mode, K, iters, beta, bins, tau, seed], |
| | outputs=[report_md, plot, df_out, csv_out, json_out]) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch() |