| | import streamlit as st |
| | import os |
| | import google.generativeai as genai |
| | from dotenv import load_dotenv |
| | import time |
| | from typing import Any, List, Optional |
| | import numpy as np |
| | from sklearn.metrics.pairwise import cosine_similarity |
| | import tensorflow as tf |
| | from tensorflow.keras.models import Sequential |
| | from tensorflow.keras.layers import Dense, Input |
| | from tensorflow.keras.utils import to_categorical |
| | from tensorflow.keras.optimizers import Adam |
| |
|
| | |
| | load_dotenv() |
| | GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") |
| |
|
| | |
| | if GOOGLE_API_KEY: |
| | genai.configure(api_key=GOOGLE_API_KEY) |
| | else: |
| | st.error( |
| | "Google AI Studio API key not found. Please add it to your .env file. " |
| | "You can obtain an API key from https://makersuite.google.com/." |
| | ) |
| | st.stop() |
| |
|
| | st.title("Embeddings and Vector Search Demo") |
| | st.subheader("Explore Embeddings and Vector Databases") |
| |
|
| | |
| | with st.sidebar: |
| | st.header("Embeddings and Vector Search") |
| | st.markdown( |
| | """ |
| | This app demonstrates how embeddings and vector databases can be used for various tasks. |
| | """ |
| | ) |
| | st.subheader("Key Concepts:") |
| | st.markdown( |
| | """ |
| | - **Embeddings**: Numerical representations of text, capturing semantic meaning. |
| | - **Vector Databases**: Databases optimized for storing and querying vectors (simulated here). |
| | - **Retrieval Augmented Generation (RAG)**: Combining retrieval with LLM generation. |
| | - **Cosine Similarity**: A measure of similarity between two vectors. |
| | - **Neural Networks**: Using embeddings as input for classification. |
| | """ |
| | ) |
| | st.subheader("Whitepaper Insights") |
| | st.markdown( |
| | """ |
| | - Efficient similarity search using vector indexes (e.g., ANN). |
| | - Handling large datasets and scalability considerations. |
| | - Applications of embeddings: search, recommendation, classification, etc. |
| | """ |
| | ) |
| |
|
| | |
| | def code_block(text: str, language: str = "text") -> None: |
| | """Displays text as a formatted code block in Streamlit.""" |
| | st.markdown(f"```{language}\n{text}\n```", unsafe_allow_html=True) |
| |
|
| | def display_response(response: Any) -> None: |
| | """Displays the model's response.""" |
| | if response and hasattr(response, "text"): |
| | st.subheader("Generated Response:") |
| | st.markdown(response.text) |
| | else: |
| | st.error("Failed to generate a response.") |
| |
|
| | def generate_embeddings(texts: List[str], model_name: str = "models/embedding-001") -> Optional[List[List[float]]]: |
| | """Generates embeddings for a list of texts using a specified model. |
| | Args: |
| | texts: List of text strings. |
| | model_name: Name of the embedding model. |
| | Returns: |
| | List of embeddings (list of floats) or None on error. |
| | """ |
| | try: |
| | |
| | embeddings = [] |
| | for text in texts: |
| | result = genai.embed_content( |
| | model=model_name, |
| | content=text, |
| | task_type="retrieval_document" |
| | ) |
| | embeddings.append(result['embedding']) |
| | return embeddings |
| | except Exception as e: |
| | st.error(f"Error generating embeddings with model '{model_name}': {e}") |
| | return None |
| |
|
| | def generate_with_retry(prompt: str, model_name: str, generation_config: genai.types.GenerationConfig, max_retries: int = 3, delay: int = 5) -> Any: |
| | """Generates content with retry logic and error handling.""" |
| | for i in range(max_retries): |
| | try: |
| | model = genai.GenerativeModel(model_name) |
| | response = model.generate_content(prompt, generation_config=generation_config) |
| | return response |
| | except Exception as e: |
| | error_message = str(e) |
| | st.warning(f"Error during generation (attempt {i + 1}/{max_retries}): {error_message}") |
| | if "404" in error_message and "not found" in error_message: |
| | st.error( |
| | f"Model '{model_name}' is not available or not supported. Please select a different model." |
| | ) |
| | return None |
| | elif i < max_retries - 1: |
| | st.info(f"Retrying in {delay} seconds...") |
| | time.sleep(delay) |
| | else: |
| | st.error(f"Failed to generate content after {max_retries} attempts. Please check your prompt and model.") |
| | return None |
| | return None |
| |
|
| | def calculate_similarity(embedding1: List[float], embedding2: List[float]) -> float: |
| | """Calculates the cosine similarity between two embeddings.""" |
| | return cosine_similarity(np.array(embedding1).reshape(1, -1), np.array(embedding2).reshape(1, -1))[0][0] |
| |
|
| | def create_and_train_model( |
| | embeddings: List[List[float]], |
| | labels: List[int], |
| | num_classes: int, |
| | epochs: int, |
| | batch_size: int, |
| | learning_rate: float, |
| | optimizer_str: str |
| | ) -> tf.keras.Model: |
| | """Creates and trains a neural network for classification.""" |
| | model = Sequential([ |
| | Input(shape=(len(embeddings[0]),), |
| | Dense(64, activation='relu'), |
| | Dense(32, activation='relu'), |
| | Dense(num_classes, activation='softmax') |
| | ]) |
| |
|
| | if optimizer_str.lower() == 'adam': |
| | optimizer = Adam(learning_rate=learning_rate) |
| | elif optimizer_str.lower() == 'sgd': |
| | optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate) |
| | elif optimizer_str.lower() == 'rmsprop': |
| | optimizer = tf.keras.optimizers.RMSprop(learning_rate=learning_rate) |
| | else: |
| | optimizer = Adam(learning_rate=learning_rate) |
| |
|
| | model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy']) |
| | encoded_labels = to_categorical(labels, num_classes=num_classes) |
| | model.fit(np.array(embeddings), encoded_labels, epochs=epochs, batch_size=batch_size, verbose=0) |
| | return model |
| | |
| | st.header("RAG Question Answering") |
| | rag_model_name = st.selectbox("Select model for RAG:", ["gemini-pro"], index=0) |
| | rag_embedding_model = st.selectbox("Select embedding model for RAG:", ["models/embedding-001"], index=0) |
| | rag_context = st.text_area( |
| | "Enter your context documents:", |
| | "Relevant information to answer the question. Separate documents with newlines.", |
| | height=150, |
| | ) |
| | rag_question = st.text_area("Ask a question about the context:", "What is the main topic?", height=70) |
| | rag_max_context_length = st.number_input("Maximum Context Length", min_value=100, max_value=2000, value=500, step=100) |
| |
|
| | if st.button("Answer with RAG"): |
| | if not rag_context or not rag_question: |
| | st.warning("Please provide both context and a question.") |
| | else: |
| | with st.spinner("Generating answer..."): |
| | try: |
| | |
| | context_embeddings = generate_embeddings(rag_context.split('\n'), rag_embedding_model) |
| | if not context_embeddings: |
| | st.stop() |
| |
|
| | |
| | question_embedding = generate_embeddings([rag_question], rag_embedding_model) |
| | if not question_embedding: |
| | st.stop() |
| |
|
| | |
| | similarities = cosine_similarity(np.array(question_embedding).reshape(1, -1), np.array(context_embeddings))[0] |
| |
|
| | |
| | most_relevant_index = np.argmax(similarities) |
| | relevant_context = rag_context.split('\n')[most_relevant_index] |
| | if len(relevant_context) > rag_max_context_length: |
| | relevant_context = relevant_context[:rag_max_context_length] |
| |
|
| | |
| | rag_prompt = f"Use the following context to answer the question: '{rag_question}'.\nContext: {relevant_context}" |
| |
|
| | |
| | response = generate_with_retry(rag_prompt, rag_model_name, generation_config=genai.types.GenerationConfig()) |
| | if response: |
| | display_response(response) |
| | except Exception as e: |
| | st.error(f"An error occurred: {e}") |
| |
|
| | |
| | st.header("Text Similarity") |
| | similarity_embedding_model = st.selectbox("Select embedding model for similarity:", ["models/embedding-001"], index=0) |
| | text1 = st.text_area("Enter text 1:", "This is the first sentence.", height=70) |
| | text2 = st.text_area("Enter text 2:", "This is a similar sentence.", height=70) |
| |
|
| | if st.button("Calculate Similarity"): |
| | if not text1 or not text2: |
| | st.warning("Please provide both texts.") |
| | else: |
| | with st.spinner("Calculating similarity..."): |
| | try: |
| | embeddings = generate_embeddings([text1, text2], similarity_embedding_model) |
| | if not embeddings: |
| | st.stop() |
| | similarity = calculate_similarity(embeddings[0], embeddings[1]) |
| | st.subheader("Cosine Similarity:") |
| | st.write(similarity) |
| | except Exception as e: |
| | st.error(f"An error occurred: {e}") |
| |
|
| | |
| | st.header("Neural Classification with Embeddings") |
| | classification_embedding_model = st.selectbox("Select embedding model for classification:", ["models/embedding-001"], index=0) |
| | classification_data = st.text_area( |
| | "Enter your training data (text, label pairs), separated by newlines. Example: text1,0\\ntext2,1", |
| | "text1,0\ntext2,1\ntext3,0\ntext4,1", |
| | height=150, |
| | ) |
| | classification_prompt = st.text_area("Enter text to classify:", "This is a test text.", height=70) |
| | num_epochs = st.number_input("Number of Epochs", min_value=1, max_value=200, value=10, step=1) |
| | batch_size = st.number_input("Batch Size", min_value=1, max_value=128, value=32, step=1) |
| | learning_rate = st.number_input("Learning Rate", min_value=0.0001, max_value=0.1, value=0.0001, step=0.0001, format="%.4f") |
| | optimizer_str = st.selectbox("Optimizer", ['adam', 'sgd', 'rmsprop'], index=0) |
| |
|
| | def process_classification_data(data: str) -> Optional[tuple[List[str], List[int]]]: |
| | """Processes the classification data string into lists of texts and labels.""" |
| | data_pairs = [line.split(',') for line in data.split('\n') if ',' in line] |
| | if not data_pairs: |
| | st.error("No valid data pairs found. Please ensure each line contains 'text,label'.") |
| | return None |
| | texts = [] |
| | labels = [] |
| | for i, pair in enumerate(data_pairs): |
| | if len(pair) != 2: |
| | st.error(f"Invalid data format in line {i + 1}: '{','.join(pair)}'. Expected 'text,label'.") |
| | return None |
| | text = pair[0].strip() |
| | label_str = pair[1].strip() |
| | try: |
| | label = int(label_str) |
| | texts.append(text) |
| | labels.append(label) |
| | except ValueError: |
| | st.error(f"Invalid label value in line {i + 1}: '{label_str}'. Label must be an integer.") |
| | return None |
| | return texts, labels |
| |
|
| | if st.button("Classify"): |
| | if not classification_data or not classification_prompt: |
| | st.warning("Please provide training data and text to classify.") |
| | else: |
| | with st.spinner("Classifying..."): |
| | try: |
| | processed_data = process_classification_data(classification_data) |
| | if not processed_data: |
| | st.stop() |
| | train_texts, train_labels = processed_data |
| | num_classes = len(set(train_labels)) |
| |
|
| | train_embeddings = generate_embeddings(train_texts, classification_embedding_model) |
| | if not train_embeddings: |
| | st.stop() |
| |
|
| | model = create_and_train_model( |
| | train_embeddings, train_labels, num_classes, num_epochs, batch_size, learning_rate, optimizer_str |
| | ) |
| |
|
| | predict_embedding = generate_embeddings([classification_prompt], classification_embedding_model) |
| | if not predict_embedding: |
| | st.stop() |
| |
|
| | prediction = model.predict(np.array([predict_embedding]), verbose=0) |
| | predicted_class = np.argmax(prediction[0]) |
| | st.subheader("Predicted Class:") |
| | st.write(predicted_class) |
| | st.subheader("Prediction Probabilities:") |
| | st.write(prediction) |
| |
|
| | except Exception as e: |
| | st.error(f"An error occurred: {e}") |