| | |
| | from typing import List, Dict, Any, Optional, Tuple |
| | import pandas as pd |
| | import faiss |
| | import numpy as np |
| | import aiohttp |
| | from datetime import datetime |
| | import logging |
| | from config.config import settings |
| | from functools import lru_cache |
| | from io import StringIO |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class DataService: |
| | def __init__(self, model_service): |
| | self.embedder = model_service.embedder |
| | self.cache = {} |
| | self.last_update = None |
| | self.faiss_index = None |
| | self.data_cleaned = None |
| |
|
| | async def fetch_csv_data(self) -> pd.DataFrame: |
| | """Fetch CSV data from URL with retry logic""" |
| | async with aiohttp.ClientSession() as session: |
| | for attempt in range(settings.MAX_RETRIES): |
| | try: |
| | async with session.get(settings.CSV_URL) as response: |
| | if response.status == 200: |
| | content = await response.text() |
| | return pd.read_csv(StringIO(content), sep='|') |
| | else: |
| | logger.error(f"Failed to fetch data: HTTP {response.status}") |
| | except Exception as e: |
| | logger.error(f"Attempt {attempt + 1} failed: {e}", exc_info=True) |
| | if attempt == settings.MAX_RETRIES - 1: |
| | raise |
| | return pd.DataFrame() |
| |
|
| | async def prepare_data_and_index(self) -> Tuple[pd.DataFrame, Any]: |
| | """Prepare data and create FAISS index with caching""" |
| | try: |
| | current_time = datetime.now() |
| | |
| | |
| | if (self.last_update and |
| | (current_time - self.last_update).seconds < settings.CACHE_DURATION and |
| | self.cache): |
| | return self.cache['data'], self.cache['index'] |
| |
|
| | data = await self.fetch_csv_data() |
| | if data.empty: |
| | logger.error("Failed to fetch data") |
| | return pd.DataFrame(), None |
| |
|
| | |
| | columns_to_keep = [ |
| | 'ID', 'Name', 'Description', 'Price', |
| | 'ProductCategory', 'Grammage', |
| | 'BasePriceText', 'Rating', 'RatingCount', |
| | 'Ingredients', 'CreationDate', 'Keywords', 'Brand' |
| | ] |
| | |
| | self.data_cleaned = data[columns_to_keep].copy() |
| | |
| | |
| | self.data_cleaned['Description'] = self.data_cleaned['Description'].astype(str).str.replace( |
| | r'[^\w\s.,;:\'/?!€$%&()\[\]{}<>|=+\\-]', ' ', regex=True |
| | ) |
| | |
| | |
| | self.data_cleaned['combined_text'] = self.data_cleaned.apply( |
| | lambda row: ( |
| | f"{row['Name']} {row['Name']} " |
| | f"{str(row['Description'])} " |
| | f"{str(row['Keywords']) if pd.notnull(row['Keywords']) else ''} " |
| | f"{str(row['ProductCategory']) if pd.notnull(row['ProductCategory']) else ''}" |
| | ).strip(), |
| | axis=1 |
| | ) |
| |
|
| | |
| | embeddings = self.embedder.encode( |
| | self.data_cleaned['combined_text'].tolist(), |
| | convert_to_tensor=True, |
| | show_progress_bar=True |
| | ).cpu().detach().numpy() |
| |
|
| | d = embeddings.shape[1] |
| | self.faiss_index = faiss.IndexFlatL2(d) |
| | self.faiss_index.add(embeddings) |
| |
|
| | |
| | self.cache = { |
| | 'data': self.data_cleaned, |
| | 'index': self.faiss_index |
| | } |
| | self.last_update = current_time |
| |
|
| | return self.data_cleaned, self.faiss_index |
| |
|
| | except Exception as e: |
| | logger.error(f"Error in prepare_data_and_index: {e}", exc_info=True) |
| | return pd.DataFrame(), None |
| |
|
| | async def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]: |
| | """Search for products similar to the query""" |
| | try: |
| | if not self.faiss_index: |
| | self.data_cleaned, self.faiss_index = await self.prepare_data_and_index() |
| | if self.faiss_index is None: |
| | return [] |
| |
|
| | |
| | query_embedding = self.embedder.encode([query], convert_to_tensor=True) |
| | query_embedding_np = query_embedding.cpu().detach().numpy() |
| |
|
| | |
| | distances, indices = self.faiss_index.search(query_embedding_np, top_k) |
| |
|
| | |
| | results = [] |
| | for i, idx in enumerate(indices[0]): |
| | try: |
| | product = {} |
| | row = self.data_cleaned.iloc[idx] |
| | for column in self.data_cleaned.columns: |
| | value = row[column] |
| | |
| | if isinstance(value, (np.integer, np.floating)): |
| | value = value.item() |
| | elif isinstance(value, pd.Timestamp): |
| | value = value.isoformat() |
| | elif isinstance(value, np.bool_): |
| | value = bool(value) |
| | product[column] = value |
| | |
| | product['score'] = float(distances[0][i]) |
| | results.append(product) |
| | except Exception as e: |
| | logger.error(f"Error processing search result {i}: {e}", exc_info=True) |
| | continue |
| |
|
| | return results |
| |
|
| | except Exception as e: |
| | logger.error(f"Error in search: {e}", exc_info=True) |
| | return [] |