import json import logging import os import uuid import pickle from datetime import datetime from typing import Optional from contextlib import contextmanager from pathlib import Path import numpy as np import pandas as pd import streamlit as st import sys from sentence_transformers import SentenceTransformer from huggingface_hub import snapshot_download # Add the project root to the Python path sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../.."))) import src.book_recommender.core.config as config from src.book_recommender.core.exceptions import ( DataNotFoundError, ) from src.book_recommender.core.logging_config import ( configure_logging, ) from src.book_recommender.data.processor import clean_and_prepare_data from src.book_recommender.ml.clustering import ( cluster_books, get_cluster_names, ) from src.book_recommender.ml.embedder import generate_embedding_for_query from src.book_recommender.ml.explainability import explain_recommendation from src.book_recommender.ml.feedback import save_feedback from src.book_recommender.ml.recommender import BookRecommender from src.book_recommender.utils import get_cover_url_multi_source, load_book_covers_batch configure_logging(log_file="app.log", log_level=os.getenv("LOG_LEVEL", "INFO")) logger = logging.getLogger(__name__) st.set_page_config( page_title="DeepShelf Legacy Demo", page_icon="📚", layout="wide", initial_sidebar_state="collapsed" ) @st.cache_resource(show_spinner=False) def ensure_data_available(): """ Checks if data exists. If not, downloads from Hugging Face. Uses st.secrets or env vars for configuration. """ if ( config.PROCESSED_DATA_PATH.exists() and config.EMBEDDINGS_PATH.exists() and config.CLUSTERS_CACHE_PATH.exists() ): return # Try getting ID from env var or Streamlit secrets dataset_id = os.getenv("HF_DATASET_ID") if not dataset_id and "HF_DATASET_ID" in st.secrets: dataset_id = st.secrets["HF_DATASET_ID"] if not dataset_id: st.warning("HF_DATASET_ID not found. Using local data generation (slow). Set HF_DATASET_ID to download pre-computed data.") return try: with st.spinner(f"Downloading 100MB+ dataset from Hugging Face ({dataset_id})... this happens only once!"): config.PROCESSED_DATA_DIR.mkdir(parents=True, exist_ok=True) snapshot_download( repo_id=dataset_id, repo_type="dataset", local_dir=config.PROCESSED_DATA_DIR, local_dir_use_symlinks=False, allow_patterns=["*.parquet", "*.npy", "*.pkl", "*.json"], token=os.getenv("HF_TOKEN") # Optional for public datasets ) st.success("Data downloaded successfully!") except Exception as e: st.error(f"Failed to download data: {e}") logger.error(f"Download failed: {e}") @st.cache_resource(show_spinner=False) def load_embedding_model() -> SentenceTransformer: """ Load and cache the sentence-transformer model. This ensures the model is loaded only once per session/process. """ logger.info(f"Loading embedding model: {config.EMBEDDING_MODEL}") return SentenceTransformer(config.EMBEDDING_MODEL) @contextmanager def custom_spinner(text="Loading..."): """ A custom spinner that uses CSS/HTML for a better loading animation. """ placeholder = st.empty() placeholder.markdown( f"""

{text}

""", unsafe_allow_html=True, ) try: yield finally: placeholder.empty() st.markdown( """ """, unsafe_allow_html=True, ) @st.cache_resource(show_spinner=False) def load_recommender() -> BookRecommender: """Load recommender without cluttering UI with status messages.""" files_exist = ( os.path.exists(config.PROCESSED_DATA_PATH) and os.path.exists(config.EMBEDDINGS_PATH) and os.path.exists(config.EMBEDDING_METADATA_PATH) ) model_changed = False if files_exist: with open(config.EMBEDDING_METADATA_PATH, "r", encoding="utf-8") as f: metadata = json.load(f) if metadata.get("model_name") != config.EMBEDDING_MODEL: model_changed = True if files_exist and not model_changed: book_data = pd.read_parquet(config.PROCESSED_DATA_PATH) embeddings = np.load(config.EMBEDDINGS_PATH) else: from src.book_recommender.ml.embedder import generate_embeddings if not os.path.exists(config.RAW_DATA_PATH): raise DataNotFoundError(f"Raw data file not found at: {config.RAW_DATA_PATH}") book_data = clean_and_prepare_data(str(config.RAW_DATA_PATH), str(config.PROCESSED_DATA_PATH)) embeddings = generate_embeddings(book_data, model_name=config.EMBEDDING_MODEL, show_progress_bar=False) np.save(config.EMBEDDINGS_PATH, embeddings) metadata = {"model_name": config.EMBEDDING_MODEL} with open(config.EMBEDDING_METADATA_PATH, "w", encoding="utf-8") as f: json.dump(metadata, f) recommender = BookRecommender(book_data=book_data, embeddings=embeddings) return recommender @st.cache_resource(show_spinner=False) def load_cluster_data() -> tuple[np.ndarray, dict, pd.DataFrame]: """ Generates and returns cached book clusters, cluster names, and book data with cluster IDs. Tries to load from disk first, then falls back to computation and saves to disk. """ logger.info("Generating/Loading cluster data for Streamlit app...") recommender = load_recommender() book_data_df = recommender.book_data.copy() # Try loading from disk if os.path.exists(config.CLUSTERS_CACHE_PATH): try: logger.info(f"Loading cached clusters from {config.CLUSTERS_CACHE_PATH}") with open(config.CLUSTERS_CACHE_PATH, "rb") as f: cached_data = pickle.load(f) clusters_arr = cached_data["clusters_arr"] names = cached_data["names"] # Verify consistency if len(clusters_arr) == len(book_data_df): book_data_df["cluster_id"] = clusters_arr logger.info("Cluster data loaded from disk cache.") return clusters_arr, names, book_data_df else: logger.warning("Cached cluster data size mismatch. Recomputing...") except Exception as e: logger.error(f"Failed to load cluster cache: {e}. Recomputing...") # Recompute if not found or failed embeddings_arr = recommender.embeddings clusters_arr, _ = cluster_books(embeddings_arr, n_clusters=config.NUM_CLUSTERS) book_data_df["cluster_id"] = clusters_arr names = get_cluster_names(book_data_df, clusters_arr) # Save to disk try: with open(config.CLUSTERS_CACHE_PATH, "wb") as f: pickle.dump({"clusters_arr": clusters_arr, "names": names}, f) logger.info(f"Cluster data saved to {config.CLUSTERS_CACHE_PATH}") except Exception as e: logger.error(f"Failed to save cluster cache: {e}") logger.info("Cluster data generated/loaded and cached for Streamlit app.") return clusters_arr, names, book_data_df def render_header(): """Modern header with gradient background.""" st.markdown( """

📚 DeepShelf Legacy Demo

Explore the power of semantic search

""", unsafe_allow_html=True, ) def render_search_section(): """Enhanced search interface with examples.""" st.markdown('
', unsafe_allow_html=True) st.markdown("### What kind of book are you looking for?") col1, col2, col3, col4, col5 = st.columns([1, 2, 2, 2, 1]) with col2: if st.button("Fantasy", key="example1", width="stretch"): st.session_state.query = "A fantasy adventure with magic and dragons" with col3: if st.button("Mystery", key="example2", width="stretch"): st.session_state.query = "A psychological thriller with unexpected twists" with col4: if st.button("Romance", key="example3", width="stretch"): st.session_state.query = "A heartwarming romance set in a small town" # Main search input query = st.text_area( "Describe the book you want", value=st.session_state.get("query", ""), height=100, placeholder=( "Example: A science fiction novel about time travel with " "complex characters and philosophical themes..." ), label_visibility="collapsed", ) col1, col2, col3 = st.columns([1, 2, 1]) with col2: search_button = st.button("Find My Perfect Books", type="primary", width="stretch") st.markdown("
", unsafe_allow_html=True) return query, search_button def render_book_card(rec, index, cover_url, query_text: Optional[str] = None): """Render a beautiful book card with all details.""" # Generate stars string rating_html = '
 
' if rec.get("rating") and rec.get("rating") != "N/A" and pd.notna(rec["rating"]): try: rating = float(rec["rating"]) stars = "⭐" * int(rating) rating_html = f'
{stars} {rating:.1f}/5
' except Exception: pass # Generate genres HTML genres_html = '
 
' if rec.get("genres") and isinstance(rec.get("genres"), str): genres = [g.strip() for g in rec["genres"].split(",")[:3]] pills = "".join([f'{g.title()}' for g in genres]) genres_html = f'
{pills}
' # Generate match badge match_html = "" if "similarity" in rec: similarity_percent = rec["similarity"] * 100 match_html = f'
{similarity_percent:.0f}% Match
' # Single HTML block for the card content card_html = f"""
{rec['title']}
{match_html}
{rec['title']}
by {rec.get("authors", "Unknown Author")}
{rating_html} {genres_html}
""" st.markdown(card_html, unsafe_allow_html=True) # Interactive elements (outside the HTML block) with st.container(): if rec.get("description"): with st.expander("Read More"): desc = rec["description"][:250] + "..." if len(rec["description"]) > 250 else rec["description"] st.write(desc) if query_text and "similarity" in rec: with st.expander("Why this recommendation?"): from src.book_recommender.ml.explainability import ( explain_recommendation, ) explanation = explain_recommendation( query_text=query_text, recommended_book=rec, similarity_score=rec["similarity"] ) st.info(explanation["summary"]) if explanation.get("matching_features"): st.markdown("**Matching features:**") for feature in explanation["matching_features"]: st.markdown(f"• {feature}") st.markdown('
', unsafe_allow_html=True) if query_text: col_a, col_b, col_c = st.columns([1, 1, 3]) with col_a: if st.button("👍", key=f"like_{index}", help="Good recommendation", use_container_width=True): save_feedback(query_text, rec, "positive", st.session_state.session_id) st.toast(f"Feedback saved!", icon="👍") with col_b: if st.button("👎", key=f"dislike_{index}", help="Not relevant", use_container_width=True): save_feedback(query_text, rec, "negative", st.session_state.session_id) st.toast(f"Feedback saved!", icon="👎") with col_c: if st.button("View Details", key=f"details_{index}", use_container_width=True): show_book_details(rec, query_text=query_text) else: if st.button("View Details", key=f"details_{index}", use_container_width=True): show_book_details(rec, query_text=query_text) st.markdown('
', unsafe_allow_html=True) @st.dialog("Book Details") def show_book_details(book, query_text: Optional[str] = None): col1, col2 = st.columns([1, 2]) with col1: st.image(get_cover_url_multi_source(book["title"], book.get("authors", ""))) with col2: st.markdown(f"## {book['title']}") st.markdown(f"**by {book.get('authors', 'Unknown Author')}**") if query_text and "similarity" in book: explanation = explain_recommendation( query_text=query_text, recommended_book=book, similarity_score=book["similarity"] ) st.markdown(f"**Match Score:** {explanation['match_score']}% (Confidence: {explanation['confidence']})") st.write(explanation["summary"]) st.divider() similarity_percent = book["similarity"] * 100 st.markdown(f"**{similarity_percent:.0f}% Match**") if book.get("rating") and pd.notna(book["rating"]): try: rating = float(book["rating"]) stars = "⭐" * int(rating) st.markdown(f"{stars} **{rating:.1f}/5**") except Exception: pass if book.get("genres"): st.markdown(f"**Genres:** {book['genres']}") st.divider() if book.get("description"): st.markdown("### Description") st.write(book["description"]) st.divider() col_a, col_b = st.columns(2) with col_a: st.link_button( "Search on Google", f"https://www.google.com/search?q={book['title']}+{book.get('authors', '')}" ) with col_b: st.link_button("View on Goodreads", f"https://www.goodreads.com/search?q={book['title']}") def render_sidebar(recommender): """Renders the sidebar with filters and search history.""" with st.sidebar: st.markdown("## Recent Searches") if not st.session_state.search_history: st.info("Your recent searches will appear here.") else: for item in reversed(st.session_state.search_history): if st.button(f"'{item['query'][:20]}...' ({item['results_count']} found)", key=item["timestamp"]): st.session_state.query = item["query"] st.rerun() st.markdown("---") st.markdown("## Filters") if "recommendations" in st.session_state and st.session_state.recommendations: st.session_state.min_rating = st.slider( "Minimum Rating", 0.0, 5.0, st.session_state.get("min_rating", 0.0), 0.5 ) all_genres = set() for rec in st.session_state.recommendations: if rec.get("genres"): genres = [g.strip() for g in rec["genres"].split(",")] all_genres.update(genres) st.session_state.selected_genres = st.multiselect( "Genres", sorted(list(all_genres)), default=st.session_state.get("selected_genres", []) ) else: st.info("Perform a search to see filters.") def main(): """Main application with UI.""" # Ensure data is present before doing anything else ensure_data_available() render_header() if "query" not in st.session_state: st.session_state.query = "" if "search_history" not in st.session_state: st.session_state.search_history = [] if "recommendations" not in st.session_state: st.session_state.recommendations = [] if "session_id" not in st.session_state: st.session_state.session_id = str(uuid.uuid4()) try: recommender = load_recommender() render_sidebar(recommender) tab1, tab2 = st.tabs(["Search", "Browse Collections"]) with tab1: query, search_button = render_search_section() if (search_button and query.strip()) or (query and query != st.session_state.get("last_query", "")): st.session_state.last_query = query with custom_spinner("Finding the perfect books for you..."): # Use the cached model instance embedding_model = load_embedding_model() query_embedding = generate_embedding_for_query(query, model=embedding_model) st.session_state.recommendations = recommender.get_recommendations_from_vector( query_embedding, top_k=10, similarity_threshold=0.25, ) if st.session_state.recommendations: st.session_state.search_history.append( { "query": query, "timestamp": datetime.now(), "results_count": len(st.session_state.recommendations), } ) st.session_state.search_history = st.session_state.search_history[-5:] st.rerun() else: st.warning( "No books found matching your description. Try being more specific or use different keywords!" ) st.session_state.recommendations = [] elif search_button: st.warning("Please describe what kind of book you're looking for!") if st.session_state.recommendations: filtered = st.session_state.recommendations min_rating = st.session_state.get("min_rating", 0.0) selected_genres = st.session_state.get("selected_genres", []) if min_rating > 0.0: filtered = [ rec for rec in filtered if rec.get("rating") and pd.notna(rec["rating"]) and float(rec.get("rating", 0)) >= min_rating ] if selected_genres: filtered = [ rec for rec in filtered if rec.get("genres") and any(g in rec.get("genres", "") for g in selected_genres) ] st.markdown( f'
Found {len(filtered)} Perfect Books For You
', unsafe_allow_html=True, ) if not filtered: st.info("No books match your current filters. Try adjusting them!") else: visible_recs = filtered[:12] with custom_spinner("Loading book covers..."): covers_dict = load_book_covers_batch(visible_recs) for row_idx in range(0, min(len(filtered), 12), 4): cols = st.columns(4, gap="medium") for col_idx, rec in enumerate(filtered[row_idx : row_idx + 4]): with cols[col_idx]: unique_idx = row_idx + col_idx cover_url = covers_dict.get(rec["title"], config.FALLBACK_COVER_URL) render_book_card(rec, unique_idx, cover_url, query_text=query) with tab2: st.markdown('
', unsafe_allow_html=True) st.markdown("### Browse Books by Collection") clusters_arr, cluster_names, book_data_with_clusters = load_cluster_data() cluster_options = [ f"{name} ({np.sum(clusters_arr == cluster_id)} books)" for cluster_id, name in cluster_names.items() ] selected_cluster_option = st.selectbox("Select a Collection", options=cluster_options, index=0) selected_cluster_id = int(list(cluster_names.keys())[cluster_options.index(selected_cluster_option)]) cluster_books_df = book_data_with_clusters[book_data_with_clusters["cluster_id"] == selected_cluster_id] st.markdown( f'
Books in {cluster_names[selected_cluster_id]}
', unsafe_allow_html=True, ) if not cluster_books_df.empty: cluster_recs = cluster_books_df.to_dict(orient="records") visible_recs = cluster_recs[:12] with custom_spinner("Loading book covers..."): covers_dict = load_book_covers_batch(visible_recs) for row_idx in range(0, min(len(cluster_recs), 12), 4): cols = st.columns(4, gap="medium") for col_idx, rec in enumerate(cluster_recs[row_idx : row_idx + 4]): with cols[col_idx]: unique_idx = f"cluster_{selected_cluster_id}_{row_idx + col_idx}" cover_url = covers_dict.get(rec["title"], config.FALLBACK_COVER_URL) render_book_card({**rec, "similarity": 1.0}, unique_idx, cover_url) else: st.info("No books found in this collection.") st.markdown("
", unsafe_allow_html=True) except Exception as e: st.error(f"Something went wrong: {str(e)}") logger.error(f"Application error: {e}", exc_info=True) if __name__ == "__main__": main()