Spaces:
Build error
Build error
| # services/data_service.py | |
| from typing import List, Dict, Any, Optional, Tuple | |
| import pandas as pd | |
| import faiss | |
| import numpy as np | |
| import aiohttp | |
| from datetime import datetime | |
| import logging | |
| from config.config import settings | |
| from functools import lru_cache | |
| from io import StringIO # Add explicit StringIO import | |
| logger = logging.getLogger(__name__) | |
| class DataService: | |
| def __init__(self, model_service): | |
| self.embedder = model_service.embedder | |
| self.cache = {} | |
| self.last_update = None | |
| self.faiss_index = None | |
| self.data_cleaned = None | |
| async def fetch_csv_data(self) -> pd.DataFrame: | |
| """Fetch CSV data from URL with retry logic""" | |
| async with aiohttp.ClientSession() as session: | |
| for attempt in range(settings.MAX_RETRIES): | |
| try: | |
| async with session.get(settings.CSV_URL) as response: | |
| if response.status == 200: | |
| content = await response.text() | |
| return pd.read_csv(StringIO(content), sep='|') | |
| else: | |
| logger.error(f"Failed to fetch data: HTTP {response.status}") | |
| except Exception as e: | |
| logger.error(f"Attempt {attempt + 1} failed: {e}", exc_info=True) | |
| if attempt == settings.MAX_RETRIES - 1: | |
| raise | |
| return pd.DataFrame() # Return empty DataFrame if all attempts fail | |
| async def prepare_data_and_index(self) -> Tuple[pd.DataFrame, Any]: | |
| """Prepare data and create FAISS index with caching""" | |
| try: | |
| current_time = datetime.now() | |
| # Check cache validity | |
| if (self.last_update and | |
| (current_time - self.last_update).seconds < settings.CACHE_DURATION and | |
| self.cache): | |
| return self.cache['data'], self.cache['index'] | |
| data = await self.fetch_csv_data() | |
| if data.empty: | |
| logger.error("Failed to fetch data") | |
| return pd.DataFrame(), None | |
| # Data cleaning and preparation | |
| columns_to_keep = [ | |
| 'ID', 'Name', 'Description', 'Price', | |
| 'ProductCategory', 'Grammage', | |
| 'BasePriceText', 'Rating', 'RatingCount', | |
| 'Ingredients', 'CreationDate', 'Keywords', 'Brand' | |
| ] | |
| self.data_cleaned = data[columns_to_keep].copy() | |
| # Clean description text | |
| self.data_cleaned['Description'] = self.data_cleaned['Description'].astype(str).str.replace( | |
| r'[^\w\s.,;:\'/?!€$%&()\[\]{}<>|=+\\-]', ' ', regex=True | |
| ) | |
| # Combine text fields with weights | |
| self.data_cleaned['combined_text'] = self.data_cleaned.apply( | |
| lambda row: ( | |
| f"{row['Name']} {row['Name']} " # Double weight for name | |
| f"{str(row['Description'])} " | |
| f"{str(row['Keywords']) if pd.notnull(row['Keywords']) else ''} " | |
| f"{str(row['ProductCategory']) if pd.notnull(row['ProductCategory']) else ''}" | |
| ).strip(), | |
| axis=1 | |
| ) | |
| # Create FAISS index | |
| embeddings = self.embedder.encode( | |
| self.data_cleaned['combined_text'].tolist(), | |
| convert_to_tensor=True, | |
| show_progress_bar=True | |
| ).cpu().detach().numpy() | |
| d = embeddings.shape[1] | |
| self.faiss_index = faiss.IndexFlatL2(d) | |
| self.faiss_index.add(embeddings) | |
| # Update cache | |
| self.cache = { | |
| 'data': self.data_cleaned, | |
| 'index': self.faiss_index | |
| } | |
| self.last_update = current_time | |
| return self.data_cleaned, self.faiss_index | |
| except Exception as e: | |
| logger.error(f"Error in prepare_data_and_index: {e}", exc_info=True) | |
| return pd.DataFrame(), None | |
| async def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]: | |
| """Search for products similar to the query""" | |
| try: | |
| if not self.faiss_index: | |
| self.data_cleaned, self.faiss_index = await self.prepare_data_and_index() | |
| if self.faiss_index is None: | |
| return [] | |
| # Create query embedding | |
| query_embedding = self.embedder.encode([query], convert_to_tensor=True) | |
| query_embedding_np = query_embedding.cpu().detach().numpy() | |
| # Search in FAISS index | |
| distances, indices = self.faiss_index.search(query_embedding_np, top_k) | |
| # Prepare results | |
| results = [] | |
| for i, idx in enumerate(indices[0]): | |
| try: | |
| product = {} | |
| row = self.data_cleaned.iloc[idx] | |
| for column in self.data_cleaned.columns: | |
| value = row[column] | |
| # Convert numpy/pandas types to Python native types | |
| if isinstance(value, (np.integer, np.floating)): | |
| value = value.item() | |
| elif isinstance(value, pd.Timestamp): | |
| value = value.isoformat() | |
| elif isinstance(value, np.bool_): | |
| value = bool(value) | |
| product[column] = value | |
| product['score'] = float(distances[0][i]) | |
| results.append(product) | |
| except Exception as e: | |
| logger.error(f"Error processing search result {i}: {e}", exc_info=True) | |
| continue | |
| return results | |
| except Exception as e: | |
| logger.error(f"Error in search: {e}", exc_info=True) | |
| return [] |