| import re
|
| import asyncio
|
| import httpx
|
| import unicodedata
|
| from typing import Dict, List, Tuple, Any, Optional
|
| from rapidfuzz import process, fuzz
|
|
|
| class SmartSearch:
|
| def __init__(self, films_url: str, tv_series_url: str):
|
| self.films_url = films_url
|
| self.tv_series_url = tv_series_url
|
| self.data = {
|
| 'films': {},
|
| 'series': {},
|
| 'episodes': {'keys': [], 'data': []}
|
| }
|
| self.lock = asyncio.Lock()
|
| self.is_initialized = False
|
|
|
| @staticmethod
|
| def normalize_text(text: str) -> str:
|
| """Normalize text for consistent searching"""
|
| text = unicodedata.normalize('NFKD', text)
|
| text = ''.join(c for c in text if not unicodedata.combining(c))
|
| text = re.sub(r'^(films|tv)/', '', text, flags=re.IGNORECASE)
|
| text = re.sub(r'[^\w\s]', '', text)
|
| text = re.sub(r'[\-_]', ' ', text)
|
| return text.lower().strip()
|
|
|
| async def fetch_data(self, url: str, max_retries: int = 5) -> Any:
|
| """Robust data fetcher with retries"""
|
| async with httpx.AsyncClient(timeout=30.0) as client:
|
| for attempt in range(max_retries):
|
| try:
|
| response = await client.get(url)
|
| response.raise_for_status()
|
| return response.json()
|
| except (httpx.ReadTimeout, httpx.ConnectError) as e:
|
| if attempt == max_retries - 1:
|
| raise
|
| await asyncio.sleep(2 ** attempt)
|
| except Exception as e:
|
| raise
|
| return None
|
|
|
| async def load_data(self) -> Tuple[Dict, Dict]:
|
| """Load and normalize all data sources"""
|
| films, series = await asyncio.gather(
|
| self.fetch_data(self.films_url),
|
| self.fetch_data(self.tv_series_url))
|
|
|
| film_data = {self.normalize_text(film): film for film in films}
|
| series_data = {}
|
| episode_keys = []
|
| episode_records = []
|
|
|
| for series_path, episodes in series.items():
|
| normalized_series = self.normalize_text(series_path)
|
| series_data[normalized_series] = series_path
|
|
|
| for episode in episodes:
|
| norm_episode = self.normalize_text(episode['episode'])
|
| composite_key = f"{normalized_series} {norm_episode}"
|
| episode_keys.append(composite_key)
|
| episode_records.append({
|
| 'series': series_path,
|
| 'title': episode['episode'],
|
| 'path': episode['path'],
|
| 'season': episode['season']
|
| })
|
|
|
| return film_data, series_data, (episode_keys, episode_records)
|
|
|
| async def initialize(self):
|
| """Initialize and refresh data"""
|
| async with self.lock:
|
| films, series, episodes = await self.load_data()
|
| self.data = {
|
| 'films': films,
|
| 'series': series,
|
| 'episodes': {'keys': episodes[0], 'data': episodes[1]}
|
| }
|
| self.is_initialized = True
|
|
|
| async def search(self, query: str, limit: int = 20) -> Dict[str, List]:
|
| """Perform a search across all content"""
|
| if not self.is_initialized:
|
| await self.initialize()
|
|
|
| query = self.normalize_text(query)
|
| results = {'films': [], 'series': [], 'episodes': []}
|
|
|
|
|
| film_matches = process.extract(
|
| query, self.data['films'].keys(),
|
| scorer=fuzz.token_set_ratio, score_cutoff=30, limit=limit
|
| )
|
| results['films'] = [self.data['films'][m[0]] for m in film_matches]
|
|
|
|
|
| series_matches = process.extract(
|
| query, self.data['series'].keys(),
|
| scorer=fuzz.token_set_ratio, score_cutoff=45, limit=limit
|
| )
|
| results['series'] = [self.data['series'][m[0]] for m in series_matches]
|
|
|
|
|
| episode_matches = process.extract(
|
| query, self.data['episodes']['keys'],
|
| scorer=fuzz.WRatio, score_cutoff=55, limit=limit*2
|
| )
|
| seen_episodes = set()
|
| for match in episode_matches:
|
| idx = match[2]
|
| episode = self.data['episodes']['data'][idx]
|
| unique_key = f"{episode['path']}-{episode['season']}"
|
| if unique_key not in seen_episodes:
|
| results['episodes'].append(episode)
|
| seen_episodes.add(unique_key)
|
| if len(results['episodes']) >= limit:
|
| break
|
|
|
| return results
|
|
|
| async def update_data(self):
|
| """Refresh the search index"""
|
| await self.initialize() |