Spaces:
Build error
Build error
| # services/data_service.py | |
| from typing import List, Dict, Any, Optional | |
| import pandas as pd | |
| import faiss | |
| import numpy as np | |
| import aiohttp | |
| from datetime import datetime | |
| import logging | |
| from config.config import settings | |
| from functools import lru_cache | |
| logger = logging.getLogger(__name__) | |
| class DataService: | |
| def __init__(self, model_service): | |
| self.embedder = model_service.embedder | |
| self.cache = {} | |
| self.last_update = None | |
| self.faiss_index = None | |
| self.data_cleaned = None | |
| async def fetch_csv_data(self) -> pd.DataFrame: | |
| async with aiohttp.ClientSession() as session: | |
| for attempt in range(settings.MAX_RETRIES): | |
| try: | |
| async with session.get(settings.CSV_URL) as response: | |
| if response.status == 200: | |
| content = await response.text() | |
| return pd.read_csv(pd.StringIO(content), sep='|') | |
| except Exception as e: | |
| logger.error(f"Attempt {attempt + 1} failed: {e}") | |
| if attempt == settings.MAX_RETRIES - 1: | |
| raise | |
| async def prepare_data_and_index(self) -> tuple: | |
| current_time = datetime.now() | |
| # Check cache validity | |
| if (self.last_update and | |
| (current_time - self.last_update).seconds < settings.CACHE_DURATION and | |
| self.cache): | |
| return self.cache['data'], self.cache['index'] | |
| data = await self.fetch_csv_data() | |
| # Data cleaning and preparation | |
| columns_to_keep = [ | |
| 'ID', 'Name', 'Description', 'Price', | |
| 'ProductCategory', 'Grammage', | |
| 'BasePriceText', 'Rating', 'RatingCount', | |
| 'Ingredients', 'CreationDate', 'Keywords', 'Brand' | |
| ] | |
| self.data_cleaned = data[columns_to_keep].copy() | |
| self.data_cleaned['Description'] = self.data_cleaned['Description'].str.replace( | |
| r'[^\w\s.,;:\'/?!€$%&()\[\]{}<>|=+\\-]', ' ', regex=True | |
| ) | |
| # Improved text combination with weights | |
| self.data_cleaned['combined_text'] = self.data_cleaned.apply( | |
| lambda row: ( | |
| f"{row['Name']} {row['Name']} " # Double weight for name | |
| f"{row['Description']} " | |
| f"{row['Keywords'] if pd.notnull(row['Keywords']) else ''} " | |
| f"{row['ProductCategory'] if pd.notnull(row['ProductCategory']) else ''}" | |
| ).strip(), | |
| axis=1 | |
| ) | |
| # Create FAISS index | |
| embeddings = self.embedder.encode( | |
| self.data_cleaned['combined_text'].tolist(), | |
| convert_to_tensor=True, | |
| show_progress_bar=True | |
| ).cpu().detach().numpy() | |
| d = embeddings.shape[1] | |
| self.faiss_index = faiss.IndexFlatL2(d) | |
| self.faiss_index.add(embeddings) | |
| # Update cache | |
| self.cache = { | |
| 'data': self.data_cleaned, | |
| 'index': self.faiss_index | |
| } | |
| self.last_update = current_time | |
| return self.data_cleaned, self.faiss_index | |
| async def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]: | |
| if not self.faiss_index: | |
| await self.prepare_data_and_index() | |
| query_embedding = self.embedder.encode([query], convert_to_tensor=True).cpu().detach().numpy() | |
| distances, indices = self.faiss_index.search(query_embedding, top_k) | |
| results = [] | |
| for i, idx in enumerate(indices[0]): | |
| product = self.data_cleaned.iloc[idx].to_dict() | |
| product['score'] = float(distances[0][i]) | |
| results.append(product) | |
| return results | |