Spaces:
Sleeping
Sleeping
| import logging | |
| import streamlit as st | |
| from typing import Dict, List, Any | |
| import pickle | |
| import chromadb | |
| from chromadb.config import Settings | |
| from openai import OpenAI | |
| import numpy as np | |
| from nltk.tokenize import word_tokenize | |
| from dotenv import load_dotenv | |
| import os | |
| from langsmith import traceable | |
| from langsmith import Client | |
| from langsmith.wrappers import wrap_openai | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # Load environment variables | |
| #load_dotenv() | |
| # Obtener las claves de los secrets de Hugging Face | |
| #OpenAI.api_key = st.secrets["OPENAI_API_KEY"].strip() | |
| #os.environ["LANGCHAIN_API_KEY"] = st.secrets["LANGCHAIN_API_KEY"] | |
| #os.environ["LANGCHAIN_TRACING_V2"] = st.secrets["LANGCHAIN_TRACING_V2"] | |
| # Initialize LangSmith client | |
| langsmith_client = Client() | |
| # Wrap OpenAI client with LangSmith | |
| openai = wrap_openai(OpenAI()) | |
| def detect_language(text: str) -> str: | |
| """ | |
| Simple language detection for English/Spanish based on common words. | |
| Args: | |
| text: Input text to detect language | |
| Returns: | |
| str: 'es' for Spanish, 'en' for English | |
| """ | |
| # Common Spanish words/characters | |
| spanish_indicators = {'qué', 'cuál', 'cómo', 'dónde', 'por', 'para', 'perro', 'comida', | |
| 'mejor', 'precio', 'barato', 'caro', 'cachorro', 'adulto'} | |
| # Convert to lowercase for comparison | |
| text_lower = text.lower() | |
| # Count Spanish indicators | |
| spanish_count = sum(1 for word in spanish_indicators if word in text_lower) | |
| # If we find Spanish indicators, classify as Spanish, otherwise default to English | |
| return 'es' if spanish_count > 0 else 'en' | |
| class DogFoodQASystem: | |
| def __init__(self): | |
| """Initialize the QA system with vector stores and models.""" | |
| self.load_stores() | |
| def load_stores(self) -> None: | |
| """Load BM25 and ChromaDB stores.""" | |
| with open('bm25_index.pkl', 'rb') as f: | |
| self.bm25_data = pickle.load(f) | |
| self.chroma_client = chromadb.PersistentClient(path="chroma_db") | |
| self.collection = self.chroma_client.get_collection("dog_food_descriptions") | |
| def hybrid_search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]: | |
| """ | |
| Hybrid search that gets top_k results from each source and combines unique results. | |
| """ | |
| logging.info(f"\n{'='*50}\nStarting hybrid search for query: {query}") | |
| # BM25 search - get top_k results | |
| tokenized_query = word_tokenize(query.lower()) | |
| bm25_scores = self.bm25_data['model'].get_scores(tokenized_query) | |
| bm25_indices = np.argsort(bm25_scores)[::-1][:top_k] # Get top_k results | |
| bm25_results = [ | |
| { | |
| 'score': float(bm25_scores[idx]), | |
| 'text': self.bm25_data['chunks'][idx], | |
| 'metadata': self.bm25_data['metadata'][idx], | |
| 'source': 'BM25' | |
| } | |
| for idx in bm25_indices | |
| ] | |
| logging.info(f"Retrieved {len(bm25_results)} results from BM25") | |
| # Vector search - get top_k results | |
| try: | |
| embedding_response = openai.embeddings.create( | |
| model="text-embedding-ada-002", | |
| input=query | |
| ) | |
| query_embedding = embedding_response.data[0].embedding | |
| chroma_results = self.collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=top_k, # Get top_k results | |
| include=["documents", "metadatas", "distances"] | |
| ) | |
| processed_vector_results = [ | |
| { | |
| 'score': float(1 - distance), | |
| 'text': doc, | |
| 'metadata': meta, | |
| 'source': 'Vector' | |
| } | |
| for doc, meta, distance in zip( | |
| chroma_results['documents'][0], | |
| chroma_results['metadatas'][0], | |
| chroma_results['distances'][0] | |
| ) | |
| ] | |
| logging.info(f"Retrieved {len(processed_vector_results)} results from Vector search") | |
| except Exception as e: | |
| logging.error(f"Error in vector search: {str(e)}", exc_info=True) | |
| processed_vector_results = [] | |
| # Combine results | |
| all_results = self._smart_combine_results(bm25_results, processed_vector_results, query) | |
| return all_results | |
| def _smart_combine_results(self, bm25_results: List[Dict], vector_results: List[Dict], query: str) -> List[Dict]: | |
| """ | |
| Combine results from both sources, tracking duplicates and sources. | |
| """ | |
| logging.info("\nCombining search results...") | |
| # Initialize combined results with tracking | |
| combined_dict = {} # Use text as key to track duplicates | |
| # Process vector results | |
| for result in vector_results: | |
| text = result['text'] | |
| if text not in combined_dict: | |
| result['sources'] = ['Vector'] | |
| result['original_scores'] = {'Vector': result['score']} | |
| combined_dict[text] = result | |
| logging.info(f"Added Vector result (score: {result['score']:.4f})") | |
| else: | |
| combined_dict[text]['sources'].append('Vector') | |
| combined_dict[text]['original_scores']['Vector'] = result['score'] | |
| logging.info(f"Marked existing result as found by Vector (score: {result['score']:.4f})") | |
| # Process BM25 results | |
| for result in bm25_results: | |
| text = result['text'] | |
| if text not in combined_dict: | |
| result['sources'] = ['BM25'] | |
| result['original_scores'] = {'BM25': result['score']} | |
| combined_dict[text] = result | |
| logging.info(f"Added BM25 result (score: {result['score']:.4f})") | |
| else: | |
| combined_dict[text]['sources'].append('BM25') | |
| combined_dict[text]['original_scores']['BM25'] = result['score'] | |
| logging.info(f"Marked existing result as found by BM25 (score: {result['score']:.4f})") | |
| # Convert to list | |
| all_results = list(combined_dict.values()) | |
| # Calculate statistics | |
| total_results = len(all_results) | |
| duplicates = sum(1 for r in all_results if len(r['sources']) > 1) | |
| vector_only = sum(1 for r in all_results if r['sources'] == ['Vector']) | |
| bm25_only = sum(1 for r in all_results if r['sources'] == ['BM25']) | |
| logging.info(f"\nResults Statistics:") | |
| logging.info(f"- Total unique results: {total_results}") | |
| logging.info(f"- Duplicates (found by both): {duplicates}") | |
| logging.info(f"- Vector only: {vector_only}") | |
| logging.info(f"- BM25 only: {bm25_only}") | |
| return all_results | |
| def _adjust_score_with_metadata(self, result: Dict, query: str) -> float: | |
| """Adjust search score based on metadata relevance.""" | |
| base_score = result['score'] | |
| metadata = result['metadata'] | |
| # Initialize boost factors | |
| boost = 1.0 | |
| # Boost based on reviews (social proof) | |
| if metadata.get('reviews', 0) > 20: | |
| boost *= 1.2 | |
| # Boost based on price range mentions | |
| query_lower = query.lower() | |
| if ('affordable' in query_lower or 'barato' in query_lower) and metadata.get('price', 0) < 50: | |
| boost *= 1.3 | |
| elif ('premium' in query_lower or 'premium' in query_lower) and metadata.get('price', 0) > 100: | |
| boost *= 1.3 | |
| # Boost based on specific dog type matches | |
| dog_types = ['puppy', 'adult', 'senior', 'cachorro', 'adulto'] | |
| for dog_type in dog_types: | |
| if dog_type in query_lower and dog_type in metadata.get('dog_type', '').lower(): | |
| boost *= 1.25 | |
| break | |
| return base_score * boost | |
| def generate_answer(self, query: str, search_results: List[Dict]) -> str: | |
| """Generate a natural language answer based on search results.""" | |
| # Detect query language | |
| query_lang = detect_language(query) | |
| # Prepare context from search results | |
| context = self._prepare_context(search_results) | |
| # Create prompt based on language | |
| system_prompt = { | |
| 'es': """Eres un experto en nutrición canina. Responde a la pregunta utilizando solo el contexto proporcionado. | |
| Si no puedes responder con el contexto dado, indícalo. Incluye información sobre precios y características | |
| específicas de los productos cuando sea relevante.""", | |
| 'en': """You are a dog nutrition expert. Answer the question using only the provided context. | |
| If you cannot answer from the given context, say so. Include pricing and specific product | |
| features when relevant.""" | |
| }.get(query_lang, 'en') | |
| response = openai.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"} | |
| ], | |
| temperature=0.7, | |
| max_tokens=300 | |
| ) | |
| return response.choices[0].message.content.strip() | |
| def _prepare_context(self, search_results: List[Dict]) -> str: | |
| """Prepare search results as context for the LLM.""" | |
| context_parts = [] | |
| for result in search_results: | |
| metadata = result['metadata'] | |
| context_parts.append( | |
| f"Product: {metadata['product_name']}\n" | |
| f"Brand: {metadata['brand']}\n" | |
| f"Price: ${metadata['price']}\n" | |
| f"Weight: {metadata['weight']}kg\n" | |
| f"Dog Type: {metadata['dog_type']}\n" | |
| f"Description: {result['text']}\n" | |
| ) | |
| return "\n---\n".join(context_parts) | |
| def process_query(self, query: str) -> Dict[str, Any]: | |
| """Process a user query and return both search results and answer.""" | |
| search_results = self.hybrid_search(query) | |
| answer = self.generate_answer(query, search_results) | |
| return { | |
| "answer": answer, | |
| "search_results": search_results, | |
| "language": detect_language(query) | |
| } | |
| def diagnose_vector_store(self): | |
| """Diagnose the vector store setup.""" | |
| try: | |
| logging.info("\nDiagnosing Vector Store:") | |
| collection_info = self.collection.get() | |
| # Basic collection info | |
| doc_count = len(collection_info['ids']) | |
| logging.info(f"Collection name: {self.collection.name}") | |
| logging.info(f"Number of documents: {doc_count}") | |
| # Sample query test | |
| if doc_count > 0: | |
| test_query = "test query for diagnosis" | |
| test_embedding = openai.embeddings.create( | |
| model="text-embedding-ada-002", | |
| input=test_query | |
| ).data[0].embedding | |
| test_results = self.collection.query( | |
| query_embeddings=[test_embedding], | |
| n_results=1 | |
| ) | |
| if len(test_results['ids'][0]) > 0: | |
| logging.info("✅ Vector store test query successful") | |
| return True | |
| else: | |
| logging.error("❌ Vector store returned no results for test query") | |
| return False | |
| else: | |
| logging.error("❌ Vector store is empty") | |
| return False | |
| except Exception as e: | |
| logging.error(f"❌ Error accessing vector store: {str(e)}") | |
| return False | |