from __future__ import annotations
import logging
import math
import os
import socket
import warnings
from pathlib import Path
os.environ.setdefault("GRADIO_ANALYTICS_ENABLED", "False")
import gradio as gr
from backend.recommender import browse_spaces, recommend_spaces, refresh_spaces
from backend.spaces_index import CATEGORY_ORDER, filter_public_spaces, get_category_distribution, get_index_stats, load_spaces
from backend.storage import (
delete_feedback_comment,
get_recent_feedback,
init_db,
save_feedback,
update_feedback_comment,
)
from ui.cards import (
COMMENTS_MODAL_HTML,
COMMENTS_MODAL_SCRIPT,
FIELD_GUIDE_SHELL_SCRIPT,
render_category_section,
render_footer_shell,
render_page_shell,
render_recommendation_cards,
)
from ui.styles import APP_CSS
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logging.getLogger("httpx").setLevel(logging.WARNING)
warnings.filterwarnings("ignore", message=".*HTTP_422_UNPROCESSABLE_ENTITY.*")
CATEGORY_CHOICES = ["All"] + [category for category in CATEGORY_ORDER if category != "Other"] + ["Other"]
FEATURED_CATEGORIES = [
"Security, Risk & Privacy",
"Writing, Storytelling & Creativity",
"Games, Puzzles & Interactive Fiction",
"Education, Tutors & Learning",
"Image, Art & Design",
"Voice, Audio, Music & Speech",
"Other",
]
RESULTS_PER_PAGE = 8
SEARCH_RESULT_LIMIT = 1000
PAGINATION_SLOTS = 7
def _format_stats(stats: dict) -> str:
if not stats:
return "**Index stats:** unavailable"
def _format_stat_value(value: object) -> str:
return "n/a" if value is None else str(value)
category_counts = get_category_distribution()
category_lines = [f"- {name}: {count}" for name, count in category_counts.items()]
if not category_lines:
category_lines = ["- No categories indexed yet"]
return "\n".join(
[
"### Index stats",
f"- Listed: {_format_stat_value(stats.get('listed', 0))}",
f"- Private skipped: {_format_stat_value(stats.get('private_skipped', 0))}",
f"- README fetched: {_format_stat_value(stats.get('readme_fetched', 0))}",
f"- README missing: {_format_stat_value(stats.get('readme_missing', 0))}",
"",
"### Category distribution",
*category_lines,
]
)
def _render_stats() -> str:
return _format_stats(get_index_stats())
def _render_stats_html() -> str:
stats = get_index_stats()
category_counts = get_category_distribution()
def _display_value(value: object) -> str:
return "n/a" if value is None else f"{int(value):,}"
stat_rows = [
("Listed", stats.get("listed", 0), "๐", "stats-card__icon--listed"),
("README fetched", stats.get("readme_fetched", 0), "๐", "stats-card__icon--readme"),
("Private skipped", stats.get("private_skipped", 0), "๐", "stats-card__icon--private"),
("README missing", stats.get("readme_missing", 0), "โ ๏ธ", "stats-card__icon--missing"),
]
category_icons = {
"Developer Tools & Code Assistants": "โจ๏ธ",
"Productivity, Agents & Assistants": "๐ค",
"Voice, Audio, Music & Speech": "๐๏ธ",
"Image, Art & Design": "๐จ",
"Games, Puzzles & Interactive Fiction": "๐ฎ",
"Data, Research & Analysis": "๐",
"Search, RAG & Knowledge Tools": "๐",
"Education, Tutors & Learning": "๐",
"Writing, Storytelling & Creativity": "โ๏ธ",
"Community, Social & Collaboration": "๐ฅ",
"Security, Risk & Privacy": "๐ก๏ธ",
"Science, Math & Engineering": "โ๏ธ",
"Health, Wellness & Accessibility": "๐งก",
"Other": "โขโขโข",
}
stat_html = "".join(
f"""
{icon}
{label}
{_display_value(value)}
"""
for label, value, icon, icon_class in stat_rows
)
category_html = "".join(
f"""
{category_icons.get(name, 'โขโขโข')}
{name}
{int(count):,}
"""
for name, count in category_counts.items()
)
return f"""
Category distribution
{category_html}
"""
def _normalize_filter_text(value: str) -> str:
return " ".join(str(value or "").lower().split())
def _filter_spaces_by_text(spaces: list[dict], filter_text: str) -> list[dict]:
needle = _normalize_filter_text(filter_text)
if not needle:
return list(spaces)
filtered: list[dict] = []
for space in spaces:
name = _normalize_filter_text(space.get("name", ""))
if needle in name:
filtered.append(space)
return filtered
def _filtered_results_status(total: int, visible: int, filter_text: str) -> str:
filter_text = (filter_text or "").strip()
if not filter_text:
return f"Found {visible} recommendation(s)."
if visible == 0:
return f'No spaces match "{filter_text}".'
if visible == total:
return f'Filter "{filter_text}" matches all {visible} space(s).'
return f'Filtered to {visible} of {total} space(s) for "{filter_text}".'
def _render_filtered_results(
results: list[dict],
user_query: str,
liked_text: str,
filter_text: str,
page: int,
):
filtered_results = _filter_spaces_by_text(results, filter_text)
cards_html, page_state, prev_update, page_updates, next_update, page_info, targets, total_pages = _render_results_view(
filtered_results,
user_query,
liked_text,
page,
)
status = _filtered_results_status(len(results), len(filtered_results), filter_text)
return cards_html, filtered_results, status, page_state, prev_update, page_updates, next_update, page_info, targets, total_pages
def _render_filtered_results_bundle(
results: list[dict],
user_query: str,
liked_text: str,
filter_text: str,
page: int,
):
cards_html, filtered_results, status, page_state, prev_update, page_updates, next_update, page_info, targets, _ = _render_filtered_results(
results,
user_query,
liked_text,
filter_text,
page,
)
return cards_html, filtered_results, status, page_state, prev_update, *page_updates, next_update, page_info, targets
def _render_browse(category: str) -> str:
spaces = filter_public_spaces(browse_spaces(category or "All", limit=12))
title = category if category and category != "All" else "All Spaces"
header = (
f"
"
)
return header + render_category_section(spaces, category or "All")
def _category_pill_defs() -> list[tuple[str, str, int]]:
counts = get_category_distribution()
stats = get_index_stats()
total = int(stats.get("indexed_pool_size", 0) or sum(counts.values()))
pills = [("All", "All", total)]
pills.extend((name, name, int(counts.get(name, 0) or 0)) for name in CATEGORY_ORDER)
return pills
def _category_pill_kwargs(selected_category: str) -> list[dict]:
pills: list[dict] = []
for name, label, count in _category_pill_defs():
pills.append(
{
"value": label,
"variant": "primary" if name == selected_category else "secondary",
"elem_classes": ["category-pill-active"] if name == selected_category else ["category-pill"],
}
)
return pills
def _category_pill_updates(selected_category: str) -> list[dict]:
return [gr.update(**kwargs) for kwargs in _category_pill_kwargs(selected_category)]
def _total_pages(results: list[dict], page_size: int = RESULTS_PER_PAGE) -> int:
return max(1, math.ceil(len(results) / max(1, page_size)))
def _slice_page(results: list[dict], page: int, page_size: int = RESULTS_PER_PAGE) -> tuple[list[dict], int]:
total_pages = _total_pages(results, page_size)
safe_page = max(1, min(int(page or 1), total_pages))
start = (safe_page - 1) * page_size
return results[start : start + page_size], total_pages
def _page_targets(page: int, total_pages: int, slots: int = PAGINATION_SLOTS) -> list[int | str | None]:
if total_pages <= 0:
return [None] * slots
if total_pages <= slots:
targets = list(range(1, total_pages + 1))
targets.extend([None] * (slots - len(targets)))
return targets
if page <= 4:
return [1, 2, 3, 4, 5, "...", total_pages]
if page >= total_pages - 3:
return [1, "...", total_pages - 4, total_pages - 3, total_pages - 2, total_pages - 1, total_pages]
return [1, "...", page - 1, page, page + 1, "...", total_pages]
def _page_button_kwargs(target: int | str | None, current_page: int) -> dict:
if target is None:
return {
"value": "",
"visible": False,
"interactive": False,
"elem_classes": ["page-number"],
}
if target == "...":
return {
"value": "...",
"visible": True,
"interactive": False,
"elem_classes": ["page-ellipsis"],
"variant": "secondary",
}
return {
"value": str(target),
"visible": True,
"interactive": True,
"elem_classes": ["page-number-current"] if target == current_page else ["page-number"],
"variant": "primary" if target == current_page else "secondary",
}
def _page_button_state(target: int | str | None, current_page: int):
return gr.update(**_page_button_kwargs(target, current_page))
def _resolve_page_target(targets: list[int | str | None] | None, idx: int, fallback: int) -> int:
if not targets or idx >= len(targets):
return fallback
target = targets[idx]
return target if isinstance(target, int) else fallback
def _pagination_updates(page: int, total_pages: int):
targets = _page_targets(page, total_pages)
prev_update = gr.update(interactive=page > 1, visible=True)
next_update = gr.update(interactive=page < total_pages, visible=True)
button_updates = [_page_button_state(target, page) for target in targets]
page_info = ""
return prev_update, button_updates, next_update, page_info, targets
def _render_results_view(
results: list[dict],
user_query: str,
liked_text: str,
page: int,
):
show_reason = bool((user_query or "").strip() or (liked_text or "").strip())
public_results = filter_public_spaces(results)
page_items, total_pages = _slice_page(public_results, page)
cards_html = render_recommendation_cards(
page_items,
show_reason=show_reason,
source_query=(user_query or "").strip(),
)
prev_update, page_updates, next_update, page_info, targets = _pagination_updates(page, total_pages)
page_state = max(1, min(int(page or 1), total_pages))
return cards_html, page_state, prev_update, page_updates, next_update, page_info, targets, total_pages
def search_spaces(user_query: str, liked_text: str, category: str, top_k: int, filter_text: str = ""):
safe_category = category or "All"
results = recommend_spaces(
user_query or "",
liked_text or "",
category=safe_category,
top_k=SEARCH_RESULT_LIMIT,
)
browse_html = _render_browse(safe_category)
category_note = "" if safe_category == "All" else f" in {safe_category}"
base_status = "No strong match found for this query." if not results else f"Found {len(results)} recommendation(s){category_note}."
rendered = _render_filtered_results_bundle(results, user_query or "", liked_text or "", filter_text or "", 1)
return (
render_page_shell(get_index_stats(), active_category=safe_category, active_query=user_query or ""),
rendered[0],
_render_stats_html(),
browse_html,
base_status,
results,
rendered[1],
rendered[3],
rendered[4],
*rendered[5:12],
rendered[12],
rendered[13],
rendered[14],
safe_category,
*_category_pill_updates(safe_category),
)
def refresh_catalog(category: str, current_results: list[dict]):
refresh_spaces()
safe_category = category or "All"
browse_html = _render_browse(safe_category)
return render_page_shell(get_index_stats(), active_category=safe_category), _render_stats_html(), browse_html, "Index refreshed.", *_category_pill_updates(safe_category)
def load_homepage():
return search_spaces("", "", "All", 8, "")
def change_result_page(results: list[dict], user_query: str, liked_text: str, page: int):
cards_html, page_state, prev_update, page_updates, next_update, page_info, targets, _ = _render_results_view(
results or [],
user_query or "",
liked_text or "",
page,
)
status = f"Showing page {page_state}."
return (
cards_html,
status,
page_state,
prev_update,
*page_updates,
next_update,
page_info,
targets,
)
def _find_space(repo_id: str) -> dict | None:
repo_id = (repo_id or "").strip()
if not repo_id:
return None
try:
for item in load_spaces(force=False):
if str(item.get("id", "")).strip() == repo_id:
return item
except Exception:
logger.exception("Could not load spaces while resolving comment target")
return None
return None
def _annotate_comments_for_owner(comments: list[dict], owner_session_id: str) -> list[dict]:
owner_session_id = (owner_session_id or "").strip()
annotated: list[dict] = []
for comment in comments:
row = dict(comment)
row["can_edit"] = bool(owner_session_id and str(row.get("owner_session_id", "")).strip() == owner_session_id)
annotated.append(row)
return annotated
def _comment_payload(repo_id: str, owner_session_id: str = "") -> dict:
comments = get_recent_feedback(limit=200, repo_id=repo_id)
space = _find_space(repo_id)
comments = _annotate_comments_for_owner(comments, owner_session_id)
return {
"ok": True,
"repo_id": repo_id,
"space_title": str((space or {}).get("name", "")),
"count": len(comments),
"comments": comments,
}
def _save_repo_comment(repo_id: str, answer: str, source_query: str = "", owner_session_id: str = "") -> dict:
repo_id = (repo_id or "").strip()
answer = (answer or "").strip()
source_query = (source_query or "").strip()
owner_session_id = (owner_session_id or "").strip()
if not repo_id:
return {"ok": False, "detail": "Missing Space id.", "comments": [], "count": 0}
if not answer:
return {"ok": False, "detail": "Write a quick comment first.", "comments": [], "count": 0}
if len(answer) > 1200:
return {"ok": False, "detail": "Comment is too long. Keep it under 1200 characters.", "comments": [], "count": 0}
space = _find_space(repo_id)
if not space:
return {"ok": False, "detail": "Could not find that Space in the current index.", "comments": [], "count": 0}
save_feedback(
repo_id=repo_id,
space_title=str(space.get("name", "")),
track=str(space.get("track", "")),
question="Community comment",
answer=answer,
source_query=source_query,
owner_session_id=owner_session_id,
)
return _comment_payload(repo_id, owner_session_id)
def _get_comments_api(repo_id: str, owner_session_id: str = "") -> dict:
repo_id = (repo_id or "").strip()
if not repo_id:
return {"ok": False, "detail": "Missing Space id.", "comments": [], "count": 0}
return _comment_payload(repo_id, owner_session_id)
def _save_comment_api(repo_id: str, answer: str, source_query: str = "", owner_session_id: str = "") -> dict:
return _save_repo_comment(repo_id, answer, source_query, owner_session_id)
def _update_comment_api(repo_id: str, comment_id: int, answer: str, owner_session_id: str = "") -> dict:
repo_id = (repo_id or "").strip()
owner_session_id = (owner_session_id or "").strip()
if not repo_id:
return {"ok": False, "detail": "Missing Space id.", "comments": [], "count": 0}
if not update_feedback_comment(comment_id, owner_session_id, answer):
return {"ok": False, "detail": "Could not update comment.", "comments": [], "count": 0}
return _comment_payload(repo_id, owner_session_id)
def _delete_comment_api(repo_id: str, comment_id: int, owner_session_id: str = "") -> dict:
repo_id = (repo_id or "").strip()
owner_session_id = (owner_session_id or "").strip()
if not repo_id:
return {"ok": False, "detail": "Missing Space id.", "comments": [], "count": 0}
if not delete_feedback_comment(comment_id, owner_session_id):
return {"ok": False, "detail": "Could not delete comment.", "comments": [], "count": 0}
return _comment_payload(repo_id, owner_session_id)
def _recommend_button_update(query_text: str, liked_text: str):
enabled = bool((query_text or "").strip() or (liked_text or "").strip())
return gr.update(interactive=enabled)
def build_app():
initial_category = "All"
initial_results = recommend_spaces("", "", category=initial_category, top_k=SEARCH_RESULT_LIMIT)
initial_cards, initial_page_state, initial_prev, initial_page_updates, initial_next, initial_page_info, initial_targets, initial_total_pages = _render_results_view(
initial_results,
"",
"",
1,
)
with gr.Blocks(title="Hugging Face Spaces Recommender", analytics_enabled=False) as demo:
gr.api(
_get_comments_api,
api_name="comments_get",
api_description=False,
queue=False,
api_visibility="undocumented",
)
gr.api(
_save_comment_api,
api_name="comments_save",
api_description=False,
queue=False,
api_visibility="undocumented",
)
gr.api(
_update_comment_api,
api_name="comments_update",
api_description=False,
queue=False,
api_visibility="undocumented",
)
gr.api(
_delete_comment_api,
api_name="comments_delete",
api_description=False,
queue=False,
api_visibility="undocumented",
)
shell_html = gr.HTML(render_page_shell(get_index_stats(), active_category=initial_category), sanitize_html=False)
gr.HTML(COMMENTS_MODAL_HTML, sanitize_html=False)
stats_panel = gr.HTML(_render_stats_html(), sanitize_html=False)
status_md = gr.Markdown("Ready.", elem_classes="status-line")
base_results_state = gr.State(initial_results)
results_state = gr.State(initial_results)
page_state = gr.State(initial_page_state)
page_targets_state = gr.State(initial_targets)
category_state = gr.State(initial_category)
with gr.Row(elem_classes="search-panels"):
with gr.Column(scale=1, min_width=0):
query = gr.Textbox(
label="What kind of Space are you looking for?",
placeholder="Example: kid-friendly app for learning with quizzes",
lines=2,
elem_id="search-query",
scale=1,
)
with gr.Column(scale=1, min_width=0):
liked_text = gr.Textbox(
label="What did you like about another app? Optional.",
placeholder="Optional: magical UI, playful interaction, clear explanations, or anything else you want more of",
lines=2,
elem_id="liked-text",
scale=1,
)
with gr.Row(elem_classes="action-row"):
recommend_btn = gr.Button("Recommend Spaces", variant="primary", elem_id="recommend-btn", interactive=False)
with gr.Row(elem_classes="category-strip"):
category_buttons = []
for kwargs in _category_pill_kwargs(initial_category):
category_buttons.append(gr.Button(**kwargs))
with gr.Row(elem_classes="filter-row"):
with gr.Column(scale=1, min_width=0):
space_filter = gr.Textbox(
label="Filter these Spaces",
placeholder="Type to filter by space name",
lines=1,
max_lines=1,
elem_id="spaces-filter",
scale=1,
)
recommendations = gr.HTML(
initial_cards,
sanitize_html=False,
)
with gr.Row(elem_classes="pagination-shell"):
prev_page_btn = gr.Button("< Prev", elem_classes="page-nav", interactive=bool(initial_page_state > 1))
page_buttons = []
for target in initial_targets:
page_buttons.append(gr.Button(**_page_button_kwargs(target, initial_page_state)))
next_page_btn = gr.Button("Next >", elem_classes="page-nav", interactive=bool(initial_page_state < initial_total_pages))
page_info = gr.Markdown(initial_page_info, elem_classes="pagination-info")
browse_html = gr.HTML(_render_browse("All"), sanitize_html=False, visible=False)
gr.HTML(render_footer_shell(), sanitize_html=False)
recommend_btn.click(
lambda q, l, c, f: (gr.update(interactive=True), *search_spaces(q, l, c, 8, f)),
inputs=[query, liked_text, category_state, space_filter],
js="(q, l, c, f) => { window.bsqfDisableRecommendButton && window.bsqfDisableRecommendButton(); return [q, l, c, f]; }",
outputs=[
recommend_btn,
shell_html,
recommendations,
stats_panel,
browse_html,
status_md,
base_results_state,
results_state,
page_state,
prev_page_btn,
*page_buttons,
next_page_btn,
page_info,
page_targets_state,
category_state,
*category_buttons,
],
)
query.change(
_recommend_button_update,
inputs=[query, liked_text],
outputs=[recommend_btn],
)
liked_text.change(
_recommend_button_update,
inputs=[query, liked_text],
outputs=[recommend_btn],
)
space_filter.input(
lambda results, q, l, f: _render_filtered_results_bundle(results or [], q, l, f, 1),
inputs=[base_results_state, query, liked_text, space_filter],
outputs=[
recommendations,
results_state,
status_md,
page_state,
prev_page_btn,
*page_buttons,
next_page_btn,
page_info,
page_targets_state,
],
)
space_filter.change(
lambda results, q, l, f: _render_filtered_results_bundle(results or [], q, l, f, 1),
inputs=[base_results_state, query, liked_text, space_filter],
outputs=[
recommendations,
results_state,
status_md,
page_state,
prev_page_btn,
*page_buttons,
next_page_btn,
page_info,
page_targets_state,
],
)
demo.load(
lambda: search_spaces("", "", "All", 8, ""),
outputs=[
shell_html,
recommendations,
stats_panel,
browse_html,
status_md,
base_results_state,
results_state,
page_state,
prev_page_btn,
*page_buttons,
next_page_btn,
page_info,
page_targets_state,
category_state,
*category_buttons,
],
)
query.submit(
lambda q, l, c, f: (gr.update(interactive=True), *search_spaces(q, l, c, 8, f)),
inputs=[query, liked_text, category_state, space_filter],
js="(q, l, c, f) => { window.bsqfDisableRecommendButton && window.bsqfDisableRecommendButton(); return [q, l, c, f]; }",
outputs=[
recommend_btn,
shell_html,
recommendations,
stats_panel,
browse_html,
status_md,
base_results_state,
results_state,
page_state,
prev_page_btn,
*page_buttons,
next_page_btn,
page_info,
page_targets_state,
category_state,
*category_buttons,
],
)
prev_page_btn.click(
lambda results, q, l, page: change_result_page(results, q, l, int(page or 1) - 1),
inputs=[results_state, query, liked_text, page_state],
outputs=[recommendations, status_md, page_state, prev_page_btn, *page_buttons, next_page_btn, page_info, page_targets_state],
)
next_page_btn.click(
lambda results, q, l, page: change_result_page(results, q, l, int(page or 1) + 1),
inputs=[results_state, query, liked_text, page_state],
outputs=[recommendations, status_md, page_state, prev_page_btn, *page_buttons, next_page_btn, page_info, page_targets_state],
)
for index, button in enumerate(page_buttons):
button.click(
lambda results, q, l, targets, page, idx=index: change_result_page(
results,
q,
l,
_resolve_page_target(targets, idx, int(page or 1)),
),
inputs=[results_state, query, liked_text, page_targets_state, page_state],
outputs=[recommendations, status_md, page_state, prev_page_btn, *page_buttons, next_page_btn, page_info, page_targets_state],
)
for (category_name, _, _count), button in zip(_category_pill_defs(), category_buttons):
button.click(
lambda q, l, f, name=category_name: search_spaces(q, l, name, 8, f),
inputs=[query, liked_text, space_filter],
outputs=[
shell_html,
recommendations,
stats_panel,
browse_html,
status_md,
base_results_state,
results_state,
page_state,
prev_page_btn,
*page_buttons,
next_page_btn,
page_info,
page_targets_state,
category_state,
*category_buttons,
],
)
return demo
def _find_launch_port(preferred_port: int, max_tries: int = 10) -> int:
start = max(1, int(preferred_port))
for port in range(start, start + max_tries):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.bind(("127.0.0.1", port))
except OSError:
continue
return port
return start
init_db()
if __name__ == "__main__":
demo = build_app()
launch_port = _find_launch_port(int(os.getenv("GRADIO_SERVER_PORT", "7860")))
demo.launch(
server_name="0.0.0.0",
server_port=launch_port,
css=APP_CSS,
head=COMMENTS_MODAL_SCRIPT + FIELD_GUIDE_SHELL_SCRIPT,
allowed_paths=[str(Path(__file__).resolve().parent)],
ssr_mode=False,
enable_monitoring=False,
)