| |
| from typing import List, Dict, Any, Optional, Tuple |
| import pandas as pd |
| import faiss |
| import numpy as np |
| import aiohttp |
| from datetime import datetime |
| import logging |
| from config.config import settings |
| from functools import lru_cache |
| from io import StringIO |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class DataService: |
| def __init__(self, model_service): |
| self.embedder = model_service.embedder |
| self.cache = {} |
| self.last_update = None |
| self.faiss_index = None |
| self.data_cleaned = None |
|
|
| async def fetch_csv_data(self) -> pd.DataFrame: |
| """Fetch CSV data from URL with retry logic""" |
| async with aiohttp.ClientSession() as session: |
| for attempt in range(settings.MAX_RETRIES): |
| try: |
| async with session.get(settings.CSV_URL) as response: |
| if response.status == 200: |
| content = await response.text() |
| return pd.read_csv(StringIO(content), sep='|') |
| else: |
| logger.error(f"Failed to fetch data: HTTP {response.status}") |
| except Exception as e: |
| logger.error(f"Attempt {attempt + 1} failed: {e}", exc_info=True) |
| if attempt == settings.MAX_RETRIES - 1: |
| raise |
| return pd.DataFrame() |
|
|
| async def prepare_data_and_index(self) -> Tuple[pd.DataFrame, Any]: |
| """Prepare data and create FAISS index with caching""" |
| try: |
| current_time = datetime.now() |
| |
| |
| if (self.last_update and |
| (current_time - self.last_update).seconds < settings.CACHE_DURATION and |
| self.cache): |
| return self.cache['data'], self.cache['index'] |
|
|
| data = await self.fetch_csv_data() |
| if data.empty: |
| logger.error("Failed to fetch data") |
| return pd.DataFrame(), None |
|
|
| |
| columns_to_keep = [ |
| 'ID', 'Name', 'Description', 'Price', |
| 'ProductCategory', 'Grammage', |
| 'BasePriceText', 'Rating', 'RatingCount', |
| 'Ingredients', 'CreationDate', 'Keywords', 'Brand' |
| ] |
| |
| self.data_cleaned = data[columns_to_keep].copy() |
| |
| |
| self.data_cleaned['Description'] = self.data_cleaned['Description'].astype(str).str.replace( |
| r'[^\w\s.,;:\'/?!€$%&()\[\]{}<>|=+\\-]', ' ', regex=True |
| ) |
| |
| |
| self.data_cleaned['combined_text'] = self.data_cleaned.apply( |
| lambda row: ( |
| f"{row['Name']} {row['Name']} " |
| f"{str(row['Description'])} " |
| f"{str(row['Keywords']) if pd.notnull(row['Keywords']) else ''} " |
| f"{str(row['ProductCategory']) if pd.notnull(row['ProductCategory']) else ''}" |
| ).strip(), |
| axis=1 |
| ) |
|
|
| |
| embeddings = self.embedder.encode( |
| self.data_cleaned['combined_text'].tolist(), |
| convert_to_tensor=True, |
| show_progress_bar=True |
| ).cpu().detach().numpy() |
|
|
| d = embeddings.shape[1] |
| self.faiss_index = faiss.IndexFlatL2(d) |
| self.faiss_index.add(embeddings) |
|
|
| |
| self.cache = { |
| 'data': self.data_cleaned, |
| 'index': self.faiss_index |
| } |
| self.last_update = current_time |
|
|
| return self.data_cleaned, self.faiss_index |
|
|
| except Exception as e: |
| logger.error(f"Error in prepare_data_and_index: {e}", exc_info=True) |
| return pd.DataFrame(), None |
|
|
| async def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]: |
| """Search for products similar to the query""" |
| try: |
| if not self.faiss_index: |
| self.data_cleaned, self.faiss_index = await self.prepare_data_and_index() |
| if self.faiss_index is None: |
| return [] |
|
|
| |
| query_embedding = self.embedder.encode([query], convert_to_tensor=True) |
| query_embedding_np = query_embedding.cpu().detach().numpy() |
|
|
| |
| distances, indices = self.faiss_index.search(query_embedding_np, top_k) |
|
|
| |
| results = [] |
| for i, idx in enumerate(indices[0]): |
| try: |
| product = {} |
| row = self.data_cleaned.iloc[idx] |
| for column in self.data_cleaned.columns: |
| value = row[column] |
| |
| if isinstance(value, (np.integer, np.floating)): |
| value = value.item() |
| elif isinstance(value, pd.Timestamp): |
| value = value.isoformat() |
| elif isinstance(value, np.bool_): |
| value = bool(value) |
| product[column] = value |
| |
| product['score'] = float(distances[0][i]) |
| results.append(product) |
| except Exception as e: |
| logger.error(f"Error processing search result {i}: {e}", exc_info=True) |
| continue |
|
|
| return results |
|
|
| except Exception as e: |
| logger.error(f"Error in search: {e}", exc_info=True) |
| return [] |