JacobLinCool's picture
deploy: sync GitHub main de5dbf9
13fe947 verified
from __future__ import annotations
from collections import Counter, defaultdict
from collections.abc import Mapping, Sequence
import math
from typing import Any
from hackathon_advisor.data import (
Project,
ProjectIndex,
normalize_project_tags,
public_project_summary,
public_project_title,
tokenize,
)
from hackathon_advisor.quest_taxonomy import QUESTS, normalize_match, quest_profiles
from hackathon_advisor._text import utc_now
DASHBOARD_SCHEMA_VERSION = 1
TSNE_RANDOM_STATE = 42
TSNE_MIN_PROJECTS = 3
LINKS_PER_PROJECT = 2
CLUSTER_LABEL_ALGORITHM = "distinctive-keywords-v1"
STOPWORDS = {
"about",
"agent",
"app",
"apps",
"ai",
"all",
"an",
"and",
"are",
"as",
"at",
"before",
"assistant",
"be",
"been",
"being",
"build",
"build-small",
"build-small-hackathon",
"built",
"by",
"demo",
"face",
"for",
"from",
"gradio",
"hackathon",
"hugging",
"huggingface",
"in",
"is",
"it",
"its",
"first",
"local",
"make",
"makes",
"made",
"me",
"model",
"models",
"my",
"of",
"on",
"or",
"our",
"one",
"project",
"projects",
"pro",
"region",
"run",
"runs",
"small",
"space",
"spaces",
"submission",
"the",
"their",
"them",
"these",
"they",
"this",
"those",
"to",
"tool",
"tools",
"try",
"us",
"use",
"used",
"uses",
"using",
"we",
"with",
"you",
"your",
}
class DashboardError(ValueError):
pass
def build_dashboard_payload(
index: ProjectIndex,
*,
quest_matches: Mapping[str, Sequence[Mapping[str, Any]]] | None = None,
quest_source: str = "",
generated_at: str | None = None,
) -> dict[str, Any]:
projects = list(index.projects)
if len(projects) < TSNE_MIN_PROJECTS:
raise DashboardError(f"dashboard atlas requires at least {TSNE_MIN_PROJECTS} projects")
matrix = _embedding_matrix(index)
coordinates = _tsne_coordinates(matrix)
raw_cluster_labels = _cluster_labels(matrix)
cluster_id_by_raw, clusters = _cluster_payloads(projects, coordinates, raw_cluster_labels)
normalized_quest_matches = _normalize_quest_matches(projects, quest_matches)
points = _point_payloads(projects, coordinates, raw_cluster_labels, cluster_id_by_raw, normalized_quest_matches)
links = _nearest_links(projects, matrix)
quest_report = _quest_report(points, normalized_quest_matches, quest_source)
payload = {
"schema_version": DASHBOARD_SCHEMA_VERSION,
"generated_at": generated_at or utc_now(),
"project_count": len(projects),
"provenance": {
"snapshot_generated_at": index.generated_at,
"snapshot_source": index.source,
"index_generated_at": index.index_generated_at,
"index_algorithm": index.index_algorithm,
"snapshot_digest": index.snapshot_digest,
"embedding": index.embedding_metadata,
},
"layout": {
"algorithm": "tsne",
"metric": "cosine",
"init": "pca",
"random_state": TSNE_RANDOM_STATE,
"perplexity": _tsne_perplexity(len(projects)),
},
"cluster_label_algorithm": CLUSTER_LABEL_ALGORITHM,
"points": points,
"links": links,
"clusters": clusters,
"quest_report": quest_report,
}
validate_dashboard_payload(payload)
return payload
def validate_dashboard_payload(payload: Mapping[str, Any]) -> None:
if payload.get("schema_version") != DASHBOARD_SCHEMA_VERSION:
raise DashboardError("unsupported dashboard schema version")
project_count = int(payload.get("project_count") or 0)
if project_count < TSNE_MIN_PROJECTS:
raise DashboardError("dashboard project count is too small")
points = payload.get("points")
if not isinstance(points, list) or len(points) != project_count:
raise DashboardError("dashboard point count does not match project count")
ids: set[str] = set()
cluster_ids: set[str] = set()
for point in points:
if not isinstance(point, dict):
raise DashboardError("dashboard points must be objects")
project_id = str(point.get("id") or "")
if not project_id or project_id in ids:
raise DashboardError("dashboard points must have unique project ids")
ids.add(project_id)
x = float(point.get("x"))
y = float(point.get("y"))
if not 0.0 <= x <= 100.0 or not 0.0 <= y <= 100.0:
raise DashboardError("dashboard point coordinates must be percentages")
cluster_id = str(point.get("cluster_id") or "")
if not cluster_id:
raise DashboardError("dashboard point cluster id is missing")
cluster_ids.add(cluster_id)
clusters = payload.get("clusters")
if not isinstance(clusters, list) or not clusters:
raise DashboardError("dashboard clusters are missing")
declared_cluster_ids = {str(cluster.get("id") or "") for cluster in clusters if isinstance(cluster, dict)}
if cluster_ids - declared_cluster_ids:
raise DashboardError("dashboard points reference missing clusters")
links = payload.get("links")
if not isinstance(links, list):
raise DashboardError("dashboard links must be a list")
for link in links:
if str(link.get("source") or "") not in ids or str(link.get("target") or "") not in ids:
raise DashboardError("dashboard link references an unknown project")
quest_report = payload.get("quest_report")
if not isinstance(quest_report, dict):
raise DashboardError("dashboard quest report is missing")
if quest_report.get("status") not in {"analyzed", "not_analyzed"}:
raise DashboardError("dashboard quest report status is invalid")
def _embedding_matrix(index: ProjectIndex) -> Any:
import numpy as np
return np.asarray(index.project_vectors(), dtype=np.float32)
def _tsne_coordinates(matrix: Any) -> list[tuple[float, float]]:
from sklearn.manifold import TSNE
coords = TSNE(
n_components=2,
perplexity=_tsne_perplexity(int(matrix.shape[0])),
init="pca",
learning_rate="auto",
max_iter=1000,
metric="cosine",
random_state=TSNE_RANDOM_STATE,
).fit_transform(matrix)
return _scale_points(coords)
def _tsne_perplexity(count: int) -> int:
return max(2, min(30, count // 4))
def _cluster_labels(matrix: Any) -> list[int]:
from sklearn.cluster import KMeans
count = int(matrix.shape[0])
cluster_count = min(10, max(min(6, count), round(math.sqrt(count))))
labels = KMeans(
n_clusters=cluster_count,
random_state=TSNE_RANDOM_STATE,
n_init=20,
).fit_predict(matrix)
return [int(label) for label in labels]
def _scale_points(points: Any, low: float = 3.0, high: float = 97.0) -> list[tuple[float, float]]:
import numpy as np
scaled = np.empty_like(points, dtype=np.float64)
for axis in range(points.shape[1]):
column = points[:, axis]
minimum = float(column.min())
maximum = float(column.max())
span = maximum - minimum
if span <= 1e-9:
scaled[:, axis] = (low + high) / 2.0
else:
scaled[:, axis] = low + (column - minimum) / span * (high - low)
return [(round(float(x), 4), round(float(y), 4)) for x, y in scaled]
def _cluster_payloads(
projects: Sequence[Project],
coordinates: Sequence[tuple[float, float]],
raw_labels: Sequence[int],
) -> tuple[dict[int, str], list[dict[str, Any]]]:
grouped: dict[int, list[int]] = defaultdict(list)
for index, label in enumerate(raw_labels):
grouped[int(label)].append(index)
ordered_raw_labels = sorted(
grouped,
key=lambda label: (-len(grouped[label]), _cluster_center(coordinates, grouped[label])),
)
cluster_id_by_raw = {label: f"cluster-{position + 1}" for position, label in enumerate(ordered_raw_labels)}
clusters: list[dict[str, Any]] = []
corpus_document_frequency = _corpus_document_frequency(projects)
for raw_label in ordered_raw_labels:
indexes = grouped[raw_label]
cluster_projects = [projects[index] for index in indexes]
representatives = sorted(
cluster_projects,
key=lambda project: (project.likes, project.last_modified, project.title.lower()),
reverse=True,
)[:4]
keywords = _cluster_keywords(
cluster_projects,
corpus_document_frequency=corpus_document_frequency,
corpus_project_count=len(projects),
)
label = (
" / ".join(word.title() for word in keywords[:2])
if keywords
else _representative_cluster_label(representatives)
)
clusters.append(
{
"id": cluster_id_by_raw[raw_label],
"label": label,
"keywords": keywords,
"project_count": len(indexes),
"center": {
"x": round(sum(coordinates[index][0] for index in indexes) / len(indexes), 4),
"y": round(sum(coordinates[index][1] for index in indexes) / len(indexes), 4),
},
"representative_projects": [project.to_public_dict() for project in representatives],
}
)
return cluster_id_by_raw, clusters
def _cluster_center(coordinates: Sequence[tuple[float, float]], indexes: Sequence[int]) -> tuple[float, float]:
return (
sum(coordinates[index][0] for index in indexes) / len(indexes),
sum(coordinates[index][1] for index in indexes) / len(indexes),
)
def _corpus_document_frequency(projects: Sequence[Project]) -> Counter[str]:
document_frequency: Counter[str] = Counter()
for project in projects:
document_frequency.update(set(_project_keyword_tokens(project)))
return document_frequency
def _cluster_keywords(
projects: Sequence[Project],
*,
corpus_document_frequency: Mapping[str, int],
corpus_project_count: int,
) -> list[str]:
counts: Counter[str] = Counter()
document_frequency: Counter[str] = Counter()
project_list = list(projects)
for project in project_list:
tokens = _project_keyword_tokens(project)
counts.update(tokens)
document_frequency.update(set(tokens))
if not project_list:
return []
min_cluster_documents = 1 if len(project_list) <= 3 else 2
scored: list[tuple[float, int, int, str]] = []
for token, count in counts.items():
cluster_documents = document_frequency[token]
if cluster_documents < min_cluster_documents:
continue
corpus_documents = int(corpus_document_frequency.get(token) or 0)
if corpus_documents <= 0:
continue
inverse_document_frequency = math.log((1 + corpus_project_count) / (1 + corpus_documents))
if inverse_document_frequency <= 0.0:
continue
exclusivity = cluster_documents / corpus_documents
coverage = cluster_documents / len(project_list)
score = (
(1.0 + math.log(count))
* inverse_document_frequency
* (0.35 + 0.65 * exclusivity)
* (0.35 + 0.65 * coverage)
)
scored.append((score, cluster_documents, count, token))
scored.sort(key=lambda item: (-item[0], -item[1], -item[2], item[3]))
return [token for _score, _cluster_documents, _count, token in scored[:5]]
def _project_keyword_tokens(project: Project) -> list[str]:
text = " ".join(
[
project.title,
project.slug.replace("-", " ").replace("_", " "),
project.summary,
" ".join(normalize_project_tags(project.tags)),
" ".join(project.models),
]
)
return [token for token in tokenize(text) if _is_cluster_keyword(token)]
def _is_cluster_keyword(token: str) -> bool:
if token in STOPWORDS:
return False
if token.startswith("region"):
return False
if token.isdigit():
return False
return True
def _representative_cluster_label(projects: Sequence[Project]) -> str:
labels: list[str] = []
for project in projects:
title = public_project_title(project.title)
if title == "Untitled project":
continue
labels.append(title)
if len(labels) == 2:
break
return " / ".join(labels) if labels else "Mixed projects"
def _normalize_quest_matches(
projects: Sequence[Project],
quest_matches: Mapping[str, Sequence[Mapping[str, Any]]] | None,
) -> dict[str, list[dict[str, Any]]]:
project_ids = {project.id for project in projects}
normalized = {project.id: [] for project in projects}
if quest_matches is None:
return normalized
if set(quest_matches) != project_ids:
missing = sorted(project_ids - set(quest_matches))
extra = sorted(set(quest_matches) - project_ids)
detail = []
if missing:
detail.append(f"missing {len(missing)} projects")
if extra:
detail.append(f"unknown {len(extra)} projects")
raise DashboardError("quest analysis project coverage is invalid: " + ", ".join(detail))
for project_id, matches in quest_matches.items():
normalized[project_id] = [_normalize_quest_match(match) for match in matches]
return normalized
def _normalize_quest_match(match: Mapping[str, Any]) -> dict[str, Any]:
try:
return normalize_match(match)
except ValueError as error:
raise DashboardError(f"invalid quest match: {error}") from error
def _point_payloads(
projects: Sequence[Project],
coordinates: Sequence[tuple[float, float]],
raw_labels: Sequence[int],
cluster_id_by_raw: Mapping[int, str],
quest_matches: Mapping[str, Sequence[Mapping[str, Any]]],
) -> list[dict[str, Any]]:
points: list[dict[str, Any]] = []
for project, (x, y), raw_label in zip(projects, coordinates, raw_labels, strict=True):
matches = list(quest_matches.get(project.id) or [])
points.append(
{
"id": project.id,
"title": public_project_title(project.title),
"summary": public_project_summary(project.summary),
"url": project.url,
"host": project.host,
"likes": project.likes,
"sdk": project.sdk,
"models": list(project.models),
"tags": list(normalize_project_tags(project.tags)),
"last_modified": project.last_modified,
"x": x,
"y": y,
"cluster_id": cluster_id_by_raw[int(raw_label)],
"quest_matches": matches,
"quest_ids": [str(match["quest"]) for match in matches],
}
)
return points
def _nearest_links(projects: Sequence[Project], matrix: Any) -> list[dict[str, Any]]:
import numpy as np
similarity = matrix @ matrix.T
pairs: dict[tuple[int, int], float] = {}
for index in range(len(projects)):
order = np.argsort(similarity[index])[::-1]
neighbors = [int(candidate) for candidate in order if int(candidate) != index][:LINKS_PER_PROJECT]
for neighbor in neighbors:
left, right = sorted((index, neighbor))
pairs[(left, right)] = max(float(similarity[left, right]), pairs.get((left, right), -1.0))
return [
{
"source": projects[left].id,
"target": projects[right].id,
"score": round(max(0.0, min(1.0, score)), 4),
}
for (left, right), score in sorted(pairs.items(), key=lambda item: (-item[1], item[0]))
]
def _quest_report(
points: Sequence[Mapping[str, Any]],
quest_matches: Mapping[str, Sequence[Mapping[str, Any]]],
quest_source: str,
) -> dict[str, Any]:
profiles = {profile["id"]: profile for profile in quest_profiles()}
status = "analyzed" if quest_source else "not_analyzed"
quests = []
for quest in QUESTS:
matched_points = [
point
for point in points
if any(match["quest"] == quest for match in quest_matches.get(str(point["id"]), []))
]
examples = sorted(
matched_points,
key=lambda point: (
max(
(
float(match["confidence"])
for match in quest_matches.get(str(point["id"]), [])
if match["quest"] == quest
),
default=0.0,
),
int(point.get("likes") or 0),
),
reverse=True,
)[:4]
profile = profiles.get(quest, {"label": quest, "description": ""})
quests.append(
{
"id": quest,
"label": profile["label"],
"description": profile["description"],
"project_count": len(matched_points),
"examples": [
{
"id": point["id"],
"title": point["title"],
"url": point["url"],
}
for point in examples
],
}
)
return {
"status": status,
"source": quest_source,
"quests": quests,
}