from __future__ import annotations import logging import math import os import socket import warnings from pathlib import Path os.environ.setdefault("GRADIO_ANALYTICS_ENABLED", "False") import gradio as gr from backend.recommender import browse_spaces, recommend_spaces, refresh_spaces from backend.spaces_index import CATEGORY_ORDER, filter_public_spaces, get_category_distribution, get_index_stats, load_spaces from backend.storage import ( delete_feedback_comment, get_recent_feedback, init_db, save_feedback, update_feedback_comment, ) from ui.cards import ( COMMENTS_MODAL_HTML, COMMENTS_MODAL_SCRIPT, FIELD_GUIDE_SHELL_SCRIPT, render_category_section, render_footer_shell, render_page_shell, render_recommendation_cards, ) from ui.styles import APP_CSS logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) logging.getLogger("httpx").setLevel(logging.WARNING) warnings.filterwarnings("ignore", message=".*HTTP_422_UNPROCESSABLE_ENTITY.*") CATEGORY_CHOICES = ["All"] + [category for category in CATEGORY_ORDER if category != "Other"] + ["Other"] FEATURED_CATEGORIES = [ "Security, Risk & Privacy", "Writing, Storytelling & Creativity", "Games, Puzzles & Interactive Fiction", "Education, Tutors & Learning", "Image, Art & Design", "Voice, Audio, Music & Speech", "Other", ] RESULTS_PER_PAGE = 8 SEARCH_RESULT_LIMIT = 1000 PAGINATION_SLOTS = 7 def _format_stats(stats: dict) -> str: if not stats: return "**Index stats:** unavailable" def _format_stat_value(value: object) -> str: return "n/a" if value is None else str(value) category_counts = get_category_distribution() category_lines = [f"- {name}: {count}" for name, count in category_counts.items()] if not category_lines: category_lines = ["- No categories indexed yet"] return "\n".join( [ "### Index stats", f"- Listed: {_format_stat_value(stats.get('listed', 0))}", f"- Private skipped: {_format_stat_value(stats.get('private_skipped', 0))}", f"- README fetched: {_format_stat_value(stats.get('readme_fetched', 0))}", f"- README missing: {_format_stat_value(stats.get('readme_missing', 0))}", "", "### Category distribution", *category_lines, ] ) def _render_stats() -> str: return _format_stats(get_index_stats()) def _render_stats_html() -> str: stats = get_index_stats() category_counts = get_category_distribution() def _display_value(value: object) -> str: return "n/a" if value is None else f"{int(value):,}" stat_rows = [ ("Listed", stats.get("listed", 0), "๐Ÿ“‹", "stats-card__icon--listed"), ("README fetched", stats.get("readme_fetched", 0), "๐Ÿ“–", "stats-card__icon--readme"), ("Private skipped", stats.get("private_skipped", 0), "๐Ÿ”’", "stats-card__icon--private"), ("README missing", stats.get("readme_missing", 0), "โš ๏ธ", "stats-card__icon--missing"), ] category_icons = { "Developer Tools & Code Assistants": "โŒจ๏ธ", "Productivity, Agents & Assistants": "๐Ÿค–", "Voice, Audio, Music & Speech": "๐ŸŽ™๏ธ", "Image, Art & Design": "๐ŸŽจ", "Games, Puzzles & Interactive Fiction": "๐ŸŽฎ", "Data, Research & Analysis": "๐Ÿ“Š", "Search, RAG & Knowledge Tools": "๐Ÿ”Ž", "Education, Tutors & Learning": "๐ŸŽ“", "Writing, Storytelling & Creativity": "โœ๏ธ", "Community, Social & Collaboration": "๐Ÿ‘ฅ", "Security, Risk & Privacy": "๐Ÿ›ก๏ธ", "Science, Math & Engineering": "โš—๏ธ", "Health, Wellness & Accessibility": "๐Ÿงก", "Other": "โ€ขโ€ขโ€ข", } stat_html = "".join( f"""
{icon}
{label}
{_display_value(value)}
""" for label, value, icon, icon_class in stat_rows ) category_html = "".join( f"""
{category_icons.get(name, 'โ€ขโ€ขโ€ข')}
{name}
{int(count):,}
""" for name, count in category_counts.items() ) return f"""
Index stats
{stat_html}
Category distribution
{category_html}
""" def _normalize_filter_text(value: str) -> str: return " ".join(str(value or "").lower().split()) def _filter_spaces_by_text(spaces: list[dict], filter_text: str) -> list[dict]: needle = _normalize_filter_text(filter_text) if not needle: return list(spaces) filtered: list[dict] = [] for space in spaces: name = _normalize_filter_text(space.get("name", "")) if needle in name: filtered.append(space) return filtered def _filtered_results_status(total: int, visible: int, filter_text: str) -> str: filter_text = (filter_text or "").strip() if not filter_text: return f"Found {visible} recommendation(s)." if visible == 0: return f'No spaces match "{filter_text}".' if visible == total: return f'Filter "{filter_text}" matches all {visible} space(s).' return f'Filtered to {visible} of {total} space(s) for "{filter_text}".' def _render_filtered_results( results: list[dict], user_query: str, liked_text: str, filter_text: str, page: int, ): filtered_results = _filter_spaces_by_text(results, filter_text) cards_html, page_state, prev_update, page_updates, next_update, page_info, targets, total_pages = _render_results_view( filtered_results, user_query, liked_text, page, ) status = _filtered_results_status(len(results), len(filtered_results), filter_text) return cards_html, filtered_results, status, page_state, prev_update, page_updates, next_update, page_info, targets, total_pages def _render_filtered_results_bundle( results: list[dict], user_query: str, liked_text: str, filter_text: str, page: int, ): cards_html, filtered_results, status, page_state, prev_update, page_updates, next_update, page_info, targets, _ = _render_filtered_results( results, user_query, liked_text, filter_text, page, ) return cards_html, filtered_results, status, page_state, prev_update, *page_updates, next_update, page_info, targets def _render_browse(category: str) -> str: spaces = filter_public_spaces(browse_spaces(category or "All", limit=12)) title = category if category and category != "All" else "All Spaces" header = ( f"

Browse {title}

" "

Spaces in the current index that fit this category.

" ) return header + render_category_section(spaces, category or "All") def _category_pill_defs() -> list[tuple[str, str, int]]: counts = get_category_distribution() stats = get_index_stats() total = int(stats.get("indexed_pool_size", 0) or sum(counts.values())) pills = [("All", "All", total)] pills.extend((name, name, int(counts.get(name, 0) or 0)) for name in CATEGORY_ORDER) return pills def _category_pill_kwargs(selected_category: str) -> list[dict]: pills: list[dict] = [] for name, label, count in _category_pill_defs(): pills.append( { "value": label, "variant": "primary" if name == selected_category else "secondary", "elem_classes": ["category-pill-active"] if name == selected_category else ["category-pill"], } ) return pills def _category_pill_updates(selected_category: str) -> list[dict]: return [gr.update(**kwargs) for kwargs in _category_pill_kwargs(selected_category)] def _total_pages(results: list[dict], page_size: int = RESULTS_PER_PAGE) -> int: return max(1, math.ceil(len(results) / max(1, page_size))) def _slice_page(results: list[dict], page: int, page_size: int = RESULTS_PER_PAGE) -> tuple[list[dict], int]: total_pages = _total_pages(results, page_size) safe_page = max(1, min(int(page or 1), total_pages)) start = (safe_page - 1) * page_size return results[start : start + page_size], total_pages def _page_targets(page: int, total_pages: int, slots: int = PAGINATION_SLOTS) -> list[int | str | None]: if total_pages <= 0: return [None] * slots if total_pages <= slots: targets = list(range(1, total_pages + 1)) targets.extend([None] * (slots - len(targets))) return targets if page <= 4: return [1, 2, 3, 4, 5, "...", total_pages] if page >= total_pages - 3: return [1, "...", total_pages - 4, total_pages - 3, total_pages - 2, total_pages - 1, total_pages] return [1, "...", page - 1, page, page + 1, "...", total_pages] def _page_button_kwargs(target: int | str | None, current_page: int) -> dict: if target is None: return { "value": "", "visible": False, "interactive": False, "elem_classes": ["page-number"], } if target == "...": return { "value": "...", "visible": True, "interactive": False, "elem_classes": ["page-ellipsis"], "variant": "secondary", } return { "value": str(target), "visible": True, "interactive": True, "elem_classes": ["page-number-current"] if target == current_page else ["page-number"], "variant": "primary" if target == current_page else "secondary", } def _page_button_state(target: int | str | None, current_page: int): return gr.update(**_page_button_kwargs(target, current_page)) def _resolve_page_target(targets: list[int | str | None] | None, idx: int, fallback: int) -> int: if not targets or idx >= len(targets): return fallback target = targets[idx] return target if isinstance(target, int) else fallback def _pagination_updates(page: int, total_pages: int): targets = _page_targets(page, total_pages) prev_update = gr.update(interactive=page > 1, visible=True) next_update = gr.update(interactive=page < total_pages, visible=True) button_updates = [_page_button_state(target, page) for target in targets] page_info = "" return prev_update, button_updates, next_update, page_info, targets def _render_results_view( results: list[dict], user_query: str, liked_text: str, page: int, ): show_reason = bool((user_query or "").strip() or (liked_text or "").strip()) public_results = filter_public_spaces(results) page_items, total_pages = _slice_page(public_results, page) cards_html = render_recommendation_cards( page_items, show_reason=show_reason, source_query=(user_query or "").strip(), ) prev_update, page_updates, next_update, page_info, targets = _pagination_updates(page, total_pages) page_state = max(1, min(int(page or 1), total_pages)) return cards_html, page_state, prev_update, page_updates, next_update, page_info, targets, total_pages def search_spaces(user_query: str, liked_text: str, category: str, top_k: int, filter_text: str = ""): safe_category = category or "All" results = recommend_spaces( user_query or "", liked_text or "", category=safe_category, top_k=SEARCH_RESULT_LIMIT, ) browse_html = _render_browse(safe_category) category_note = "" if safe_category == "All" else f" in {safe_category}" base_status = "No strong match found for this query." if not results else f"Found {len(results)} recommendation(s){category_note}." rendered = _render_filtered_results_bundle(results, user_query or "", liked_text or "", filter_text or "", 1) return ( render_page_shell(get_index_stats(), active_category=safe_category, active_query=user_query or ""), rendered[0], _render_stats_html(), browse_html, base_status, results, rendered[1], rendered[3], rendered[4], *rendered[5:12], rendered[12], rendered[13], rendered[14], safe_category, *_category_pill_updates(safe_category), ) def refresh_catalog(category: str, current_results: list[dict]): refresh_spaces() safe_category = category or "All" browse_html = _render_browse(safe_category) return render_page_shell(get_index_stats(), active_category=safe_category), _render_stats_html(), browse_html, "Index refreshed.", *_category_pill_updates(safe_category) def load_homepage(): return search_spaces("", "", "All", 8, "") def change_result_page(results: list[dict], user_query: str, liked_text: str, page: int): cards_html, page_state, prev_update, page_updates, next_update, page_info, targets, _ = _render_results_view( results or [], user_query or "", liked_text or "", page, ) status = f"Showing page {page_state}." return ( cards_html, status, page_state, prev_update, *page_updates, next_update, page_info, targets, ) def _find_space(repo_id: str) -> dict | None: repo_id = (repo_id or "").strip() if not repo_id: return None try: for item in load_spaces(force=False): if str(item.get("id", "")).strip() == repo_id: return item except Exception: logger.exception("Could not load spaces while resolving comment target") return None return None def _annotate_comments_for_owner(comments: list[dict], owner_session_id: str) -> list[dict]: owner_session_id = (owner_session_id or "").strip() annotated: list[dict] = [] for comment in comments: row = dict(comment) row["can_edit"] = bool(owner_session_id and str(row.get("owner_session_id", "")).strip() == owner_session_id) annotated.append(row) return annotated def _comment_payload(repo_id: str, owner_session_id: str = "") -> dict: comments = get_recent_feedback(limit=200, repo_id=repo_id) space = _find_space(repo_id) comments = _annotate_comments_for_owner(comments, owner_session_id) return { "ok": True, "repo_id": repo_id, "space_title": str((space or {}).get("name", "")), "count": len(comments), "comments": comments, } def _save_repo_comment(repo_id: str, answer: str, source_query: str = "", owner_session_id: str = "") -> dict: repo_id = (repo_id or "").strip() answer = (answer or "").strip() source_query = (source_query or "").strip() owner_session_id = (owner_session_id or "").strip() if not repo_id: return {"ok": False, "detail": "Missing Space id.", "comments": [], "count": 0} if not answer: return {"ok": False, "detail": "Write a quick comment first.", "comments": [], "count": 0} if len(answer) > 1200: return {"ok": False, "detail": "Comment is too long. Keep it under 1200 characters.", "comments": [], "count": 0} space = _find_space(repo_id) if not space: return {"ok": False, "detail": "Could not find that Space in the current index.", "comments": [], "count": 0} save_feedback( repo_id=repo_id, space_title=str(space.get("name", "")), track=str(space.get("track", "")), question="Community comment", answer=answer, source_query=source_query, owner_session_id=owner_session_id, ) return _comment_payload(repo_id, owner_session_id) def _get_comments_api(repo_id: str, owner_session_id: str = "") -> dict: repo_id = (repo_id or "").strip() if not repo_id: return {"ok": False, "detail": "Missing Space id.", "comments": [], "count": 0} return _comment_payload(repo_id, owner_session_id) def _save_comment_api(repo_id: str, answer: str, source_query: str = "", owner_session_id: str = "") -> dict: return _save_repo_comment(repo_id, answer, source_query, owner_session_id) def _update_comment_api(repo_id: str, comment_id: int, answer: str, owner_session_id: str = "") -> dict: repo_id = (repo_id or "").strip() owner_session_id = (owner_session_id or "").strip() if not repo_id: return {"ok": False, "detail": "Missing Space id.", "comments": [], "count": 0} if not update_feedback_comment(comment_id, owner_session_id, answer): return {"ok": False, "detail": "Could not update comment.", "comments": [], "count": 0} return _comment_payload(repo_id, owner_session_id) def _delete_comment_api(repo_id: str, comment_id: int, owner_session_id: str = "") -> dict: repo_id = (repo_id or "").strip() owner_session_id = (owner_session_id or "").strip() if not repo_id: return {"ok": False, "detail": "Missing Space id.", "comments": [], "count": 0} if not delete_feedback_comment(comment_id, owner_session_id): return {"ok": False, "detail": "Could not delete comment.", "comments": [], "count": 0} return _comment_payload(repo_id, owner_session_id) def _recommend_button_update(query_text: str, liked_text: str): enabled = bool((query_text or "").strip() or (liked_text or "").strip()) return gr.update(interactive=enabled) def build_app(): initial_category = "All" initial_results = recommend_spaces("", "", category=initial_category, top_k=SEARCH_RESULT_LIMIT) initial_cards, initial_page_state, initial_prev, initial_page_updates, initial_next, initial_page_info, initial_targets, initial_total_pages = _render_results_view( initial_results, "", "", 1, ) with gr.Blocks(title="Hugging Face Spaces Recommender", analytics_enabled=False) as demo: gr.api( _get_comments_api, api_name="comments_get", api_description=False, queue=False, api_visibility="undocumented", ) gr.api( _save_comment_api, api_name="comments_save", api_description=False, queue=False, api_visibility="undocumented", ) gr.api( _update_comment_api, api_name="comments_update", api_description=False, queue=False, api_visibility="undocumented", ) gr.api( _delete_comment_api, api_name="comments_delete", api_description=False, queue=False, api_visibility="undocumented", ) shell_html = gr.HTML(render_page_shell(get_index_stats(), active_category=initial_category), sanitize_html=False) gr.HTML(COMMENTS_MODAL_HTML, sanitize_html=False) stats_panel = gr.HTML(_render_stats_html(), sanitize_html=False) status_md = gr.Markdown("Ready.", elem_classes="status-line") base_results_state = gr.State(initial_results) results_state = gr.State(initial_results) page_state = gr.State(initial_page_state) page_targets_state = gr.State(initial_targets) category_state = gr.State(initial_category) with gr.Row(elem_classes="search-panels"): with gr.Column(scale=1, min_width=0): query = gr.Textbox( label="What kind of Space are you looking for?", placeholder="Example: kid-friendly app for learning with quizzes", lines=2, elem_id="search-query", scale=1, ) with gr.Column(scale=1, min_width=0): liked_text = gr.Textbox( label="What did you like about another app? Optional.", placeholder="Optional: magical UI, playful interaction, clear explanations, or anything else you want more of", lines=2, elem_id="liked-text", scale=1, ) with gr.Row(elem_classes="action-row"): recommend_btn = gr.Button("Recommend Spaces", variant="primary", elem_id="recommend-btn", interactive=False) with gr.Row(elem_classes="category-strip"): category_buttons = [] for kwargs in _category_pill_kwargs(initial_category): category_buttons.append(gr.Button(**kwargs)) with gr.Row(elem_classes="filter-row"): with gr.Column(scale=1, min_width=0): space_filter = gr.Textbox( label="Filter these Spaces", placeholder="Type to filter by space name", lines=1, max_lines=1, elem_id="spaces-filter", scale=1, ) recommendations = gr.HTML( initial_cards, sanitize_html=False, ) with gr.Row(elem_classes="pagination-shell"): prev_page_btn = gr.Button("< Prev", elem_classes="page-nav", interactive=bool(initial_page_state > 1)) page_buttons = [] for target in initial_targets: page_buttons.append(gr.Button(**_page_button_kwargs(target, initial_page_state))) next_page_btn = gr.Button("Next >", elem_classes="page-nav", interactive=bool(initial_page_state < initial_total_pages)) page_info = gr.Markdown(initial_page_info, elem_classes="pagination-info") browse_html = gr.HTML(_render_browse("All"), sanitize_html=False, visible=False) gr.HTML(render_footer_shell(), sanitize_html=False) recommend_btn.click( lambda q, l, c, f: (gr.update(interactive=True), *search_spaces(q, l, c, 8, f)), inputs=[query, liked_text, category_state, space_filter], js="(q, l, c, f) => { window.bsqfDisableRecommendButton && window.bsqfDisableRecommendButton(); return [q, l, c, f]; }", outputs=[ recommend_btn, shell_html, recommendations, stats_panel, browse_html, status_md, base_results_state, results_state, page_state, prev_page_btn, *page_buttons, next_page_btn, page_info, page_targets_state, category_state, *category_buttons, ], ) query.change( _recommend_button_update, inputs=[query, liked_text], outputs=[recommend_btn], ) liked_text.change( _recommend_button_update, inputs=[query, liked_text], outputs=[recommend_btn], ) space_filter.input( lambda results, q, l, f: _render_filtered_results_bundle(results or [], q, l, f, 1), inputs=[base_results_state, query, liked_text, space_filter], outputs=[ recommendations, results_state, status_md, page_state, prev_page_btn, *page_buttons, next_page_btn, page_info, page_targets_state, ], ) space_filter.change( lambda results, q, l, f: _render_filtered_results_bundle(results or [], q, l, f, 1), inputs=[base_results_state, query, liked_text, space_filter], outputs=[ recommendations, results_state, status_md, page_state, prev_page_btn, *page_buttons, next_page_btn, page_info, page_targets_state, ], ) demo.load( lambda: search_spaces("", "", "All", 8, ""), outputs=[ shell_html, recommendations, stats_panel, browse_html, status_md, base_results_state, results_state, page_state, prev_page_btn, *page_buttons, next_page_btn, page_info, page_targets_state, category_state, *category_buttons, ], ) query.submit( lambda q, l, c, f: (gr.update(interactive=True), *search_spaces(q, l, c, 8, f)), inputs=[query, liked_text, category_state, space_filter], js="(q, l, c, f) => { window.bsqfDisableRecommendButton && window.bsqfDisableRecommendButton(); return [q, l, c, f]; }", outputs=[ recommend_btn, shell_html, recommendations, stats_panel, browse_html, status_md, base_results_state, results_state, page_state, prev_page_btn, *page_buttons, next_page_btn, page_info, page_targets_state, category_state, *category_buttons, ], ) prev_page_btn.click( lambda results, q, l, page: change_result_page(results, q, l, int(page or 1) - 1), inputs=[results_state, query, liked_text, page_state], outputs=[recommendations, status_md, page_state, prev_page_btn, *page_buttons, next_page_btn, page_info, page_targets_state], ) next_page_btn.click( lambda results, q, l, page: change_result_page(results, q, l, int(page or 1) + 1), inputs=[results_state, query, liked_text, page_state], outputs=[recommendations, status_md, page_state, prev_page_btn, *page_buttons, next_page_btn, page_info, page_targets_state], ) for index, button in enumerate(page_buttons): button.click( lambda results, q, l, targets, page, idx=index: change_result_page( results, q, l, _resolve_page_target(targets, idx, int(page or 1)), ), inputs=[results_state, query, liked_text, page_targets_state, page_state], outputs=[recommendations, status_md, page_state, prev_page_btn, *page_buttons, next_page_btn, page_info, page_targets_state], ) for (category_name, _, _count), button in zip(_category_pill_defs(), category_buttons): button.click( lambda q, l, f, name=category_name: search_spaces(q, l, name, 8, f), inputs=[query, liked_text, space_filter], outputs=[ shell_html, recommendations, stats_panel, browse_html, status_md, base_results_state, results_state, page_state, prev_page_btn, *page_buttons, next_page_btn, page_info, page_targets_state, category_state, *category_buttons, ], ) return demo def _find_launch_port(preferred_port: int, max_tries: int = 10) -> int: start = max(1, int(preferred_port)) for port in range(start, start + max_tries): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: sock.bind(("127.0.0.1", port)) except OSError: continue return port return start init_db() if __name__ == "__main__": demo = build_app() launch_port = _find_launch_port(int(os.getenv("GRADIO_SERVER_PORT", "7860"))) demo.launch( server_name="0.0.0.0", server_port=launch_port, css=APP_CSS, head=COMMENTS_MODAL_SCRIPT + FIELD_GUIDE_SHELL_SCRIPT, allowed_paths=[str(Path(__file__).resolve().parent)], ssr_mode=False, enable_monitoring=False, )