search / smart_searchV2.py
ChandimaPrabath's picture
Upload smart_searchV2.py
8d008af verified
import re
import asyncio
import httpx
import unicodedata
from typing import Dict, List, Tuple, Any, Optional
from rapidfuzz import process, fuzz
class SmartSearch:
def __init__(self, films_url: str, tv_series_url: str):
self.films_url = films_url
self.tv_series_url = tv_series_url
self.data = {
'films': {},
'series': {},
'episodes': {'keys': [], 'data': []}
}
self.lock = asyncio.Lock()
self.is_initialized = False
@staticmethod
def normalize_text(text: str) -> str:
"""Normalize text for consistent searching"""
text = unicodedata.normalize('NFKD', text)
text = ''.join(c for c in text if not unicodedata.combining(c))
text = re.sub(r'^(films|tv)/', '', text, flags=re.IGNORECASE)
text = re.sub(r'[^\w\s]', '', text)
text = re.sub(r'[\-_]', ' ', text)
return text.lower().strip()
async def fetch_data(self, url: str, max_retries: int = 5) -> Any:
"""Robust data fetcher with retries"""
async with httpx.AsyncClient(timeout=30.0) as client:
for attempt in range(max_retries):
try:
response = await client.get(url)
response.raise_for_status()
return response.json()
except (httpx.ReadTimeout, httpx.ConnectError) as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt)
except Exception as e:
raise
return None
async def load_data(self) -> Tuple[Dict, Dict]:
"""Load and normalize all data sources"""
films, series = await asyncio.gather(
self.fetch_data(self.films_url),
self.fetch_data(self.tv_series_url))
film_data = {self.normalize_text(film): film for film in films}
series_data = {}
episode_keys = []
episode_records = []
for series_path, episodes in series.items():
normalized_series = self.normalize_text(series_path)
series_data[normalized_series] = series_path
for episode in episodes:
norm_episode = self.normalize_text(episode['episode'])
composite_key = f"{normalized_series} {norm_episode}"
episode_keys.append(composite_key)
episode_records.append({
'series': series_path,
'title': episode['episode'],
'path': episode['path'],
'season': episode['season']
})
return film_data, series_data, (episode_keys, episode_records)
async def initialize(self):
"""Initialize and refresh data"""
async with self.lock:
films, series, episodes = await self.load_data()
self.data = {
'films': films,
'series': series,
'episodes': {'keys': episodes[0], 'data': episodes[1]}
}
self.is_initialized = True
async def search(self, query: str, limit: int = 20) -> Dict[str, List]:
"""Perform a search across all content"""
if not self.is_initialized:
await self.initialize()
query = self.normalize_text(query)
results = {'films': [], 'series': [], 'episodes': []}
# Search films with token set ratio for flexible matching
film_matches = process.extract(
query, self.data['films'].keys(),
scorer=fuzz.token_set_ratio, score_cutoff=30, limit=limit
)
results['films'] = [self.data['films'][m[0]] for m in film_matches]
# Search series with token set ratio
series_matches = process.extract(
query, self.data['series'].keys(),
scorer=fuzz.token_set_ratio, score_cutoff=45, limit=limit
)
results['series'] = [self.data['series'][m[0]] for m in series_matches]
# Search episodes with weighted scoring
episode_matches = process.extract(
query, self.data['episodes']['keys'],
scorer=fuzz.WRatio, score_cutoff=55, limit=limit*2
)
seen_episodes = set()
for match in episode_matches:
idx = match[2] # Get the original index from RapidFuzz
episode = self.data['episodes']['data'][idx]
unique_key = f"{episode['path']}-{episode['season']}"
if unique_key not in seen_episodes:
results['episodes'].append(episode)
seen_episodes.add(unique_key)
if len(results['episodes']) >= limit:
break
return results
async def update_data(self):
"""Refresh the search index"""
await self.initialize()