| """LLM-based recommender service for travel planning.""" |
|
|
| import concurrent.futures |
| import hashlib |
| import json |
| import logging |
| import math |
| import os |
| import re |
| import threading |
| import time |
| import urllib.request |
| import urllib.parse |
| import urllib.error |
|
|
| from dataclasses import dataclass |
|
|
| from openai import OpenAI |
|
|
| from utils.prompts import PROMPT_MAP, CATEGORY_GUIDANCE |
|
|
| |
| _ROAMIFY_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) |
|
|
| |
| _GEOCODE_CACHE_FILE = os.path.join(_ROAMIFY_ROOT, ".cache", "geocode_cache.json") |
| _GEOCODE_CACHE_LOCK = threading.Lock() |
|
|
|
|
| def _load_geocode_cache() -> None: |
| """Load geocode cache from disk on startup.""" |
| try: |
| with open(_GEOCODE_CACHE_FILE) as f: |
| data = json.load(f) |
| if isinstance(data, dict): |
| _GEOCODE_CACHE.update(data) |
| except (FileNotFoundError, json.JSONDecodeError): |
| pass |
|
|
|
|
| def _save_geocode_cache() -> None: |
| """Persist geocode cache to disk.""" |
| try: |
| with _GEOCODE_CACHE_LOCK: |
| with open(_GEOCODE_CACHE_FILE, "w") as f: |
| json.dump(_GEOCODE_CACHE, f) |
| except Exception: |
| pass |
|
|
|
|
| |
| _LLM_CACHE_FILE = os.path.join(_ROAMIFY_ROOT, ".cache", "llm_cache.json") |
| _LLM_CACHE_LOCK = threading.Lock() |
|
|
|
|
| def _load_llm_cache() -> None: |
| """Load LLM cache from disk on startup.""" |
| try: |
| with open(_LLM_CACHE_FILE) as f: |
| data = json.load(f) |
| if isinstance(data, dict): |
| for k, v in data.items(): |
| key = tuple(json.loads(k)) |
| _LLM_CACHE[key] = v |
| except (FileNotFoundError, json.JSONDecodeError): |
| pass |
|
|
|
|
| def _save_llm_cache() -> None: |
| """Persist LLM cache to disk.""" |
| try: |
| with _LLM_CACHE_LOCK: |
| with open(_LLM_CACHE_FILE, "w") as f: |
| serializable = {json.dumps(k): v for k, v in _LLM_CACHE.items()} |
| json.dump(serializable, f) |
| except Exception: |
| pass |
|
|
|
|
| |
| _IMAGE_CACHE_FILE = os.path.join(_ROAMIFY_ROOT, ".cache", "image_cache.json") |
| _IMAGE_CACHE_LOCK = threading.Lock() |
|
|
|
|
| def _load_image_cache() -> None: |
| """Load image cache from disk on startup.""" |
| try: |
| with open(_IMAGE_CACHE_FILE) as f: |
| data = json.load(f) |
| if isinstance(data, dict): |
| for k, v in data.items(): |
| key = tuple(json.loads(k)) |
| _IMAGE_CACHE[key] = v |
| except (FileNotFoundError, json.JSONDecodeError): |
| pass |
|
|
|
|
| def _save_image_cache() -> None: |
| """Persist image cache to disk.""" |
| try: |
| with _IMAGE_CACHE_LOCK: |
| with open(_IMAGE_CACHE_FILE, "w") as f: |
| serializable = {json.dumps(k): v for k, v in _IMAGE_CACHE.items()} |
| json.dump(serializable, f) |
| except Exception: |
| pass |
|
|
| |
| _GEOCODE_CACHE: dict[str, dict | None] = {} |
| _load_geocode_cache() |
|
|
| |
| |
| _nominatim_lock = threading.Lock() |
| _nominatim_last_call: float = 0.0 |
|
|
| |
| |
| _IMAGE_CACHE: dict[tuple[str, str, str], str] = {} |
| _load_image_cache() |
|
|
| |
| |
| |
| |
| _MAX_STOCK_SHARING = 4 |
| _SEEN_CONTENT_HASHES: dict[str, dict[str, int]] = {} |
| _SEEN_CONTENT_HASHES_LOCK = threading.Lock() |
|
|
| |
| |
| _LLM_CACHE: dict[tuple[str, str], list[dict] | None] = {} |
| _load_llm_cache() |
|
|
| |
| _TRANSLATION_CACHE_FILE = os.path.join(_ROAMIFY_ROOT, ".cache", "translation_cache.json") |
| _TRANSLATION_CACHE_LOCK = threading.Lock() |
|
|
|
|
| def _load_translation_cache() -> None: |
| """Load translation cache from disk on startup.""" |
| try: |
| with open(_TRANSLATION_CACHE_FILE) as f: |
| data = json.load(f) |
| if isinstance(data, dict): |
| for k, v in data.items(): |
| key = tuple(json.loads(k)) |
| _TRANSLATION_CACHE[key] = v |
| except (FileNotFoundError, json.JSONDecodeError): |
| pass |
|
|
|
|
| def _save_translation_cache() -> None: |
| """Persist translation cache to disk.""" |
| try: |
| with _TRANSLATION_CACHE_LOCK: |
| with open(_TRANSLATION_CACHE_FILE, "w") as f: |
| serializable = {json.dumps(k): v for k, v in _TRANSLATION_CACHE.items()} |
| json.dump(serializable, f) |
| except Exception: |
| pass |
|
|
|
|
| |
| |
| _TRANSLATION_CACHE: dict[tuple[str, str], list[dict]] = {} |
| _load_translation_cache() |
|
|
| |
| _STOP_WORDS = {"the", "a", "an", "of", "in", "on", "at", "and", "or", "de", "la", "le", "el", "di", "del"} |
|
|
| |
| _ATTRACTION_SUFFIXES = ( |
| " temple", " shrine", " castle", " palace", " park", " museum", |
| " garden", " bridge", " tower", " square", " market", " street", |
| " station", " hall", " church", " basilica", " monastery", |
| " gallery", " theater", " theatre", " library", |
| ) |
|
|
| logger = logging.getLogger("roamify") |
|
|
|
|
| @dataclass |
| class _Provider: |
| """Configuration for a single LLM provider in the rotation chain.""" |
| name: str |
| api_key: str |
| base_url: str |
| model: str |
|
|
|
|
| def _http_get_json(url: str, timeout: int = 5, retries: int = 2) -> dict | None: |
| """GET a JSON URL with retry on rate-limit and transient errors.""" |
| for attempt in range(retries + 1): |
| try: |
| req = urllib.request.Request(url, headers={"User-Agent": "TravelPlanner/1.0"}) |
| with urllib.request.urlopen(req, timeout=timeout) as resp: |
| return json.loads(resp.read().decode()) |
| except urllib.error.HTTPError as e: |
| if e.code in (429, 502, 503) and attempt < retries: |
| time.sleep(1.0 * (attempt + 1)) |
| continue |
| return None |
| except (TimeoutError, OSError, ConnectionError): |
| if attempt < retries: |
| time.sleep(0.5 * (attempt + 1)) |
| continue |
| return None |
| except Exception: |
| return None |
| return None |
|
|
|
|
| def _resolve_wiki_title(name: str) -> str: |
| """Resolve an attraction name to the correct Wikipedia article title using search.""" |
| search_url = "https://en.wikipedia.org/w/api.php?" + urllib.parse.urlencode({ |
| "action": "query", |
| "list": "search", |
| "srsearch": name, |
| "format": "json", |
| "srlimit": 1, |
| }) |
| data = _http_get_json(search_url, timeout=8) |
| if data: |
| results = data.get("query", {}).get("search", []) |
| if results: |
| return results[0]["title"] |
| return "" |
|
|
|
|
| def _is_media_entertainment_page(title: str, extract: str) -> bool: |
| """Check if a Wikipedia page is a film, TV show, video game, or other |
| non-tourist media — return True to skip it for attraction images.""" |
| title_lower = title.lower() |
| extract_lower = extract.lower() |
|
|
| |
| disambig_patterns = [ |
| "(film)", "(movie)", "(tv series)", "(tv program)", "(tv show)", |
| "(video game)", "(album)", "(song)", "(novel)", "(book)", |
| "(comics)", "(anime)", "(manga)", "(soundtrack)", "(ep)", |
| "(single)", "(play)", "(musical)", "(short film)", |
| ] |
| if any(p in title_lower for p in disambig_patterns): |
| return True |
|
|
| |
| |
| first_200 = extract_lower[:200] |
| media_indicators = [ |
| " is a ", " is an ", |
| ] |
| media_types = [ |
| " film", " movie", " tv series", " television series", |
| " video game", " album by", " novel by", " song by", |
| " comic", " manga series", " anime series", |
| ] |
| has_indicator = any(i in first_200 for i in media_indicators) |
| has_type = any(t in first_200 for t in media_types) |
| if has_indicator and has_type: |
| return True |
|
|
| return False |
|
|
|
|
| def _fetch_wiki_image(name: str, city: str = "") -> str: |
| """Tier 1: Resolve article title via search, then fetch thumbnail from Wikipedia. |
| Tries REST summary API first, then falls back to action=query pageimages API. |
| Prioritizes stripped name over original (parenthetical suffixes confuse search). |
| Skips results where the article title doesn't match the attraction name. |
| """ |
| |
| stripped = re.sub(r"\s*\(.+\)\s*$", "", name).strip() |
| candidates = [] |
| if stripped and stripped != name: |
| candidates.append(stripped) |
| candidates.append(name) |
| |
| search_names = [stripped] if stripped else [] |
| if name and (not stripped or name != stripped): |
| search_names.append(name) |
| for search_name in search_names: |
| if search_name: |
| resolved = _resolve_wiki_title(search_name) |
| if resolved and resolved not in candidates: |
| candidates.append(resolved) |
| |
| if city and len(candidates) <= 2: |
| for search_name in search_names: |
| if search_name: |
| for city_q in (f"{search_name}, {city}", f"{search_name} ({city})", f"{search_name} {city}"): |
| resolved = _resolve_wiki_title(city_q) |
| if resolved and resolved not in candidates: |
| candidates.append(resolved) |
| break |
|
|
| |
| name_core = set(re.sub(r"[()\\-_,]", " ", stripped or name).lower().split()) |
| name_core = name_core - _STOP_WORDS |
|
|
| for title in candidates: |
| if not title: |
| continue |
| |
| title_core = set(re.sub(r"[()\\-_,]", " ", title).lower().split()) - _STOP_WORDS |
| if name_core and title_core and not (name_core & title_core): |
| |
| any_shared_substr = any( |
| any(w[i:i+4] in tw for i in range(len(w) - 3) if len(w) >= 4) |
| for w in name_core |
| for tw in title_core |
| ) |
| if not any_shared_substr: |
| continue |
| |
| search_url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{urllib.parse.quote(title)}" |
| data = _http_get_json(search_url, timeout=10) |
| if data: |
| |
| page_title = data.get("title", "") or "" |
| extract = data.get("extract", "") or "" |
| if _is_media_entertainment_page(page_title, extract): |
| continue |
| source = data.get("thumbnail", {}).get("source", "") |
| if source: |
| return source |
| |
| img_url = f"https://en.wikipedia.org/w/api.php?{urllib.parse.urlencode({'action': 'query', 'titles': title, 'prop': 'pageimages', 'pithumbsize': 500, 'format': 'json'})}" |
| img_data = _http_get_json(img_url, timeout=10) |
| if img_data: |
| pages = img_data.get("query", {}).get("pages", {}) |
| for page in pages.values(): |
| thumb = page.get("thumbnail", {}).get("source", "") |
| if thumb: |
| return thumb |
| return "" |
|
|
|
|
| _MULTILANG_WIKI = ["fr", "de", "es", "it", "ja"] |
|
|
|
|
| def _fetch_wiki_image_multilang(name: str, city: str = "") -> str: |
| """Tier 1.5: Search non-English Wikipedias for an image. |
| When English Wikipedia has no thumbnail, try French, German, Spanish, |
| Italian, and Japanese editions in parallel — the next largest by article |
| count and rich in travel-related imagery. |
| """ |
| clean = re.sub(r"\s*\(.*?\)\s*$", "", name).strip() |
| search_terms = [clean] if clean and clean != name else [clean, name] |
| if city: |
| search_terms.append(f"{clean}, {city}" if clean else f"{name}, {city}") |
|
|
| import concurrent.futures |
|
|
| def _try_lang(lang: str) -> str: |
| |
| for term in search_terms[:2]: |
| if not term: |
| continue |
| try: |
| url = f"https://{lang}.wikipedia.org/w/api.php?" + urllib.parse.urlencode({ |
| "action": "query", |
| "generator": "search", |
| "gsrsearch": term, |
| "gsrlimit": 3, |
| "prop": "pageimages", |
| "pithumbsize": 500, |
| "format": "json", |
| }) |
| req = urllib.request.Request(url, headers={"User-Agent": "TravelPlanner/1.0"}) |
| with urllib.request.urlopen(req, timeout=3) as resp: |
| data = json.loads(resp.read().decode()) |
| pages = data.get("query", {}).get("pages", {}) |
| for page in pages.values(): |
| thumb = page.get("thumbnail", {}).get("source", "") |
| if thumb: |
| return thumb |
| except Exception: |
| continue |
| return "" |
|
|
| with concurrent.futures.ThreadPoolExecutor(max_workers=5) as pool: |
| futures = {pool.submit(_try_lang, lang): lang for lang in _MULTILANG_WIKI} |
| for f in concurrent.futures.as_completed(futures): |
| try: |
| result = f.result(timeout=5) |
| if result: |
| |
| for other in futures: |
| other.cancel() |
| return result |
| except Exception: |
| continue |
| return "" |
|
|
|
|
| |
| _TOURISM_KEYWORDS = { |
| "church", "cathedral", "basilica", "monument", "museum", "palace", |
| "castle", "tower", "bridge", "park", "garden", "square", "plaza", |
| "temple", "shrine", "mosque", "synagogue", "abbey", "fort", "fortress", |
| "arena", "stadium", "theater", "theatre", "gallery", "library", |
| "cemetery", "aqueduct", "fountain", "arch", "gate", "wall", |
| "district", "neighborhood", "quarter", "area", "market", "island", |
| "building", "skyscraper", |
| } |
|
|
|
|
| def _fetch_wikidata_image(name: str, city: str = "", country: str = "") -> str: |
| """Tier 2: Get image from Wikidata P18 claim → construct full Commons URL. |
| Disambiguates by preferring entities whose description contains tourism keywords. |
| Tries stripped name, then with city/country context. |
| """ |
| |
| clean = re.sub(r"\s*\(.*?\)\s*$", "", name).strip() |
| queries = [name] |
| if clean and clean != name: |
| queries.append(clean) |
| if city and clean: |
| queries.append(f"{clean}, {city}") |
| if country and clean and country != city: |
| queries.append(f"{clean}, {country}") |
|
|
| for query in queries: |
| search_url = "https://www.wikidata.org/w/api.php?" + urllib.parse.urlencode({ |
| "action": "wbsearchentities", |
| "search": query, |
| "language": "en", |
| "format": "json", |
| "limit": 5, |
| }) |
| data = _http_get_json(search_url) |
| if not data: |
| continue |
| results = data.get("search", []) |
| if not results: |
| continue |
|
|
| |
| best = None |
| for r in results[:5]: |
| desc = (r.get("description") or "").lower() |
| if any(kw in desc for kw in _TOURISM_KEYWORDS): |
| best = r |
| break |
| |
| if not best: |
| for r in results[:5]: |
| label = (r.get("label") or "").lower() |
| if clean.lower() in label or label in clean.lower(): |
| best = r |
| break |
| if not best: |
| best = results[0] |
|
|
| qid = best["id"] |
|
|
| |
| entity_url = "https://www.wikidata.org/w/api.php?" + urllib.parse.urlencode({ |
| "action": "wbgetclaims", |
| "entity": qid, |
| "property": "P18", |
| "format": "json", |
| }) |
| claims_data = _http_get_json(entity_url) |
| if not claims_data: |
| continue |
| p18 = claims_data.get("claims", {}).get("P18", []) |
| if not p18: |
| continue |
|
|
| |
| filename = p18[0]["mainsnak"]["datavalue"]["value"] |
| safe = filename.replace(" ", "_") |
| md5 = hashlib.md5(safe.encode()).hexdigest() |
| url = f"https://upload.wikimedia.org/wikipedia/commons/{md5[0]}/{md5[:2]}/{safe}" |
| return url |
| return "" |
|
|
|
|
| def _fetch_commons_image(name: str, city: str = "", country: str = "") -> str: |
| """Tier 3: Search Wikimedia Commons for an image file name, return direct URL. |
| Tries name, then name+city, then name+country for better disambiguation. |
| Skips results whose filename has no word overlap with the attraction name. |
| """ |
| |
| clean = re.sub(r"\s*\(.*?\)\s*$", "", name).strip() |
| name_core = set(re.sub(r"[()\-_,]", " ", clean or name).lower().split()) - _STOP_WORDS |
|
|
| queries = [name] |
| if clean and clean != name: |
| queries.append(clean) |
| if city and clean: |
| queries.append(f"{clean}, {city}") |
| if country and clean and country != city: |
| queries.append(f"{clean}, {country}") |
| |
| for suffix in (" Market", " Garden", " Beach", " Park", " Museum", " Square", " Tower", " Bridge", " Temple", " Shrine", " Castle", " Palace", " Street", " Station"): |
| if clean.endswith(suffix): |
| base = clean[:-len(suffix)].strip() |
| if base and base not in queries and base != clean: |
| queries.append(base) |
| |
| words = clean.split() |
| if len(words) > 2: |
| two_word = " ".join(words[:2]) |
| if two_word not in queries: |
| queries.append(two_word) |
|
|
| for query in queries: |
| search_url = "https://commons.wikimedia.org/w/api.php?" + urllib.parse.urlencode({ |
| "action": "query", |
| "list": "search", |
| "srsearch": query, |
| "srnamespace": "6", |
| "format": "json", |
| "srlimit": 5, |
| }) |
| data = _http_get_json(search_url, timeout=10, retries=1) |
| if not data: |
| continue |
| results = data.get("query", {}).get("search", []) |
| |
| for r in results: |
| title = r.get("title", "") |
| lower = title.lower() |
| if any(lower.endswith(ext) for ext in (".jpg", ".jpeg", ".png", ".webp")): |
| |
| if name_core: |
| file_core = set(re.sub(r"[()\-_,.]", " ", lower.replace("file:", "")).split()) - _STOP_WORDS |
| if not (name_core & file_core): |
| |
| any_shared_substr = any( |
| any(w[i:i+4] in tw for i in range(len(w) - 3) if len(w) >= 4) |
| for w in name_core |
| for tw in file_core |
| ) |
| if not any_shared_substr: |
| continue |
| |
| filename = title.replace("File:", "").strip() |
| safe = filename.replace(" ", "_") |
| md5 = hashlib.md5(safe.encode()).hexdigest() |
| return f"https://upload.wikimedia.org/wikipedia/commons/thumb/{md5[0]}/{md5[:2]}/{safe}/500px-{safe}" |
| return "" |
|
|
|
|
| def _fetch_local_name_image(name: str, city: str = "", country: str = "") -> str: |
| """Tier 5: Try parenthetical local name from the attraction. |
| E.g. 'Awaji Island (Koko-shima)' tries 'Koko-shima' on Commons and Wikidata. |
| Also tries '{local_name}, {city}' and '{local_name} {city}'. |
| """ |
| m = re.search(r"\((.+?)\)", name) |
| if not m: |
| return "" |
| local = m.group(1).strip() |
| if not local: |
| return "" |
|
|
| |
| queries = [local] |
| if city: |
| queries.append(f"{local}, {city}") |
| if country and country != city: |
| queries.append(f"{local}, {country}") |
|
|
| for query in queries: |
| url = _fetch_commons_image(query) |
| if url: |
| return url |
|
|
| |
| for query in queries: |
| url = _fetch_wikidata_image(query, city=city, country=country) |
| if url: |
| return url |
|
|
| return "" |
|
|
|
|
| def _get_content_hash(url: str, timeout: int = 10) -> str: |
| """Download first 4 KB of an image URL and return a SHA256 hex digest. |
| |
| Used to detect identical photos served under different stock photo URLs. |
| Returns empty string on any error — failure is non-fatal (skip dedup). |
| """ |
| try: |
| req = urllib.request.Request(url, headers={ |
| "User-Agent": "Mozilla/5.0 (compatible; Roamify/1.0)", |
| }) |
| ctx = __import__("ssl").create_default_context() |
| ctx.check_hostname = False |
| ctx.verify_mode = __import__("ssl").CERT_NONE |
| with urllib.request.urlopen(req, context=ctx, timeout=timeout) as resp: |
| return hashlib.sha256(resp.read(4096)).hexdigest()[:16] |
| except Exception: |
| return "" |
|
|
|
|
| def _register_content_hash(url: str, city_key: str) -> bool: |
| """Register a content hash for a city. Returns True if allowed (under _MAX_STOCK_SHARING). |
| |
| Downloads first 4 KB of URL, hashes it, and increments the per-city counter. |
| Returns False if the hash has already been used _MAX_STOCK_SHARING times in this city. |
| On network/hash error, returns True (allow by default — don't block on failure). |
| """ |
| content_hash = _get_content_hash(url) |
| if not content_hash: |
| return True |
| with _SEEN_CONTENT_HASHES_LOCK: |
| city_map = _SEEN_CONTENT_HASHES.setdefault(city_key, {}) |
| count = city_map.get(content_hash, 0) |
| if count >= _MAX_STOCK_SHARING: |
| return False |
| city_map[content_hash] = count + 1 |
| return True |
|
|
|
|
| def _fetch_pexels_api_image(name: str, city: str = "", country: str = "") -> str: |
| """Tier 6: Search Pexels for a high-quality photo. |
| 25,000 req/month. Better for landmarks/architecture. |
| Requires User-Agent header — Pexels blocks default Python-urllib UA (403/1010). |
| """ |
| pexels_key = os.environ.get("PEXELS_API_KEY", "") |
| if not pexels_key: |
| return "" |
|
|
| clean = re.sub(r"\s*\(.*?\)\s*$", "", name).strip() |
| query = clean |
| if city: |
| query = f"{clean} {city}" |
| elif country: |
| query = f"{clean} {country}" |
|
|
| search_url = "https://api.pexels.com/v1/search?" + urllib.parse.urlencode({ |
| "query": query, |
| "per_page": 3, |
| "orientation": "landscape", |
| "size": "medium", |
| }) |
| try: |
| req = urllib.request.Request(search_url, headers={ |
| "Authorization": pexels_key, |
| "User-Agent": "Mozilla/5.0 (compatible; Roamify/1.0; +https://roamify.app)", |
| }) |
| with urllib.request.urlopen(req, timeout=8) as resp: |
| data = json.loads(resp.read().decode()) |
| photos = data.get("photos", []) |
| city_key = city or country or "" |
| for photo in photos: |
| url = photo["src"]["medium"] |
| if _register_content_hash(url, city_key): |
| return url |
| except Exception: |
| pass |
| return "" |
|
|
|
|
| def _fetch_unsplash_api_image(name: str, city: str = "", country: str = "") -> str: |
| """Tier 8: Search Unsplash for a high-quality landscape photo. |
| Only called when all Wikimedia sources fail. Uses orientation=landscape |
| to avoid tall/portrait photos. Respects 50 req/hr demo rate limit. |
| """ |
| unsplash_key = os.environ.get("UNSPLASH_ACCESS_KEY", "") |
| if not unsplash_key: |
| return "" |
|
|
| |
| clean = re.sub(r"\s*\(.*?\)\s*$", "", name).strip() |
| query = clean |
| if city: |
| query = f"{clean} {city}" |
| elif country: |
| query = f"{clean} {country}" |
|
|
| search_url = "https://api.unsplash.com/search/photos?" + urllib.parse.urlencode({ |
| "query": query, |
| "per_page": 3, |
| "orientation": "landscape", |
| }) |
| try: |
| req = urllib.request.Request(search_url, headers={ |
| "Authorization": f"Client-ID {unsplash_key}", |
| "Accept-Version": "v1", |
| }) |
| with urllib.request.urlopen(req, timeout=8) as resp: |
| data = json.loads(resp.read().decode()) |
| results = data.get("results", []) |
| city_key = city or country or "" |
| for result in results: |
| url = result["urls"]["small"] |
| if _register_content_hash(url, city_key): |
| return url |
| except Exception: |
| pass |
| return "" |
|
|
|
|
| def _enrich_one_item(item: dict, city: str = "", country: str = "") -> None: |
| """Look up image for a single item using 7-tier fallback: |
| 1. Wikipedia REST/pageimages API (English) |
| 2. Wikipedia REST/pageimages API (French, German, Spanish, Italian, Japanese) |
| 3. Wikidata P18 image claim (with city/country context) |
| 4. Wikimedia Commons search (with simplified name variants embedded) |
| 5. Local name from parentheses (e.g. Koko-shima from Awaji Island) |
| 6. Pexels search (25,000 req/month, better for landmarks) |
| 7. Unsplash search (landscape orientation, last resort) |
| |
| Results are cached in _IMAGE_CACHE to avoid repeat API calls across searches. |
| """ |
| if item.get("image_url"): |
| return |
| name = item.get("name", "") |
| if not name: |
| item["image_url"] = "" |
| return |
|
|
| |
| |
| cache_key = (name, city or "", country or "") |
| cached_url = _IMAGE_CACHE.get(cache_key) |
| if cached_url: |
| item["image_url"] = cached_url |
| return |
|
|
| |
| url = _fetch_wiki_image(name, city=city) |
| if url: |
| _IMAGE_CACHE[cache_key] = url |
| item["image_url"] = url |
| _save_image_cache() |
| return |
| |
| url = _fetch_wiki_image_multilang(name, city=city) |
| if url: |
| _IMAGE_CACHE[cache_key] = url |
| item["image_url"] = url |
| _save_image_cache() |
| return |
| |
| url = _fetch_wikidata_image(name, city=city, country=country) |
| if url: |
| _IMAGE_CACHE[cache_key] = url |
| item["image_url"] = url |
| _save_image_cache() |
| return |
| |
| url = _fetch_commons_image(name, city=city, country=country) |
| if url: |
| _IMAGE_CACHE[cache_key] = url |
| item["image_url"] = url |
| _save_image_cache() |
| return |
| |
| url = _fetch_local_name_image(name, city=city, country=country) |
| if url: |
| _IMAGE_CACHE[cache_key] = url |
| item["image_url"] = url |
| _save_image_cache() |
| return |
| |
| url = _fetch_pexels_api_image(name, city=city, country=country) |
| if url: |
| _IMAGE_CACHE[cache_key] = url |
| item["image_url"] = url |
| _save_image_cache() |
| return |
| |
| url = _fetch_unsplash_api_image(name, city=city, country=country) |
| if url: |
| _IMAGE_CACHE[cache_key] = url |
| item["image_url"] = url |
| _save_image_cache() |
| return |
|
|
| |
| _IMAGE_CACHE[cache_key] = "" |
| item["image_url"] = "" |
| _save_image_cache() |
|
|
|
|
| def _enrich_with_images(items: list[dict], city: str = "", country: str = "") -> list[dict]: |
| """Add image_url to each item using a 7-tier fallback: |
| 1. Wikipedia REST API — English page/summary |
| 2. Wikipedia REST API — multi-language (fr, de, es, it, ja) |
| 3. Wikidata P18 image claim → full Commons URL (MD5 hash path) |
| 4. Wikimedia Commons search (with simplified/variant names embedded) |
| 5. Local name from parentheses (e.g. Koko-shima from Awaji Island) |
| 6. Pexels search (landscape, 25,000 req/month) |
| 7. Unsplash search (landscape orientation, last resort) |
| All lookups run concurrently via ThreadPoolExecutor (max 6 workers). |
| """ |
| with concurrent.futures.ThreadPoolExecutor(max_workers=6) as pool: |
| futures = [pool.submit(_enrich_one_item, item, city=city, country=country) for item in items] |
| concurrent.futures.wait(futures) |
| return items |
|
|
|
|
| def _haversine_km(lat1, lon1, lat2, lon2): |
| """Return distance in km between two lat/lon pairs.""" |
| R = 6371.0 |
| dlat = math.radians(lat2 - lat1) |
| dlon = math.radians(lon2 - lon1) |
| a = math.sin(dlat / 2) ** 2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon / 2) ** 2 |
| return R * 2 * math.asin(math.sqrt(a)) |
|
|
|
|
| def _nominatim_search_cached(query: str, timeout: int = 10, limit: int = 1) -> tuple[dict | None, bool]: |
| """Search Nominatim with caching. Returns (result, was_cached). |
| Handles Nominatim's 1-req/s rate limit internally — only sleeps on actual API calls.""" |
| cache_key = query if limit == 1 else f"{query}__limit={limit}" |
| if cache_key in _GEOCODE_CACHE: |
| return _GEOCODE_CACHE[cache_key], True |
| url = "https://nominatim.openstreetmap.org/search?" + urllib.parse.urlencode({ |
| "q": query, "format": "json", "limit": limit, "accept-language": "en", |
| }) |
| |
| global _nominatim_last_call |
| with _nominatim_lock: |
| now = time.time() |
| since_last = now - _nominatim_last_call |
| if since_last < 1.01: |
| time.sleep(1.01 - since_last) |
| _nominatim_last_call = time.time() |
| data = _http_get_json(url, timeout=timeout, retries=2) |
| if data and isinstance(data, list) and data: |
| _GEOCODE_CACHE[cache_key] = data[0] |
| _save_geocode_cache() |
| return data[0], False |
| _GEOCODE_CACHE[cache_key] = None |
| return None, False |
|
|
|
|
| def _geocode_city(city: str) -> tuple[float, float, list[float]] | None: |
| """Geocode a city center via Nominatim (cached). Returns (lat, lon, boundingbox) or None.""" |
| result, was_cached = _nominatim_search_cached(city) |
| if not result: |
| return None |
| |
| |
| if result.get("type") != "city" and result.get("class") != "place": |
| |
| fallback_result, _ = _nominatim_search_cached(city, timeout=10, limit=5) |
| if fallback_result: |
| |
| if fallback_result.get("type") == "city" or fallback_result.get("class") == "place": |
| result = fallback_result |
| _GEOCODE_CACHE[city] = fallback_result |
| _save_geocode_cache() |
| try: |
| lat = float(result["lat"]) |
| lon = float(result["lon"]) |
| bb = [float(v) for v in result.get("boundingbox", [])] |
| if len(bb) == 4: |
| return lat, lon, bb |
| return lat, lon, [] |
| except (KeyError, ValueError, IndexError): |
| return None |
|
|
|
|
|
|
| def _verify_coordinates(items: list[dict], city: str) -> list[dict]: |
| """Verify attraction coordinates. |
| |
| Strategy: |
| 1. Geocode city center (1 cached Nominatim query), get bounding box |
| 2. Adaptive radius: max(15km, bounding_box_diagonal x 0.6) |
| Compact European cities stay ~15km, spread-out cities (Bali, Dubai) |
| get a larger radius proportional to their bounding box. |
| 3. For each item: if LLM-provided coords are non-zero and within |
| adaptive radius of city center, trust them — skip Nominatim entirely. |
| 4. Only geocode items whose LLM coords fail the radius check. |
| This eliminates ~80% of Nominatim calls on a good LLM response. |
| """ |
| |
| city_result = _geocode_city(city) |
| if city_result: |
| city_center = (city_result[0], city_result[1]) |
| |
| |
| |
| bb = city_result[2] |
| if len(bb) == 4: |
| km_lat = (bb[1] - bb[0]) * 111.0 |
| km_lon = (bb[3] - bb[2]) * 111.0 * math.cos(math.radians(city_center[0])) |
| MAX_CITY_DIST_KM = max(15, math.sqrt(km_lat**2 + km_lon**2) * 0.6) |
| else: |
| MAX_CITY_DIST_KM = 15 |
| else: |
| city_center = None |
| MAX_CITY_DIST_KM = 15 |
| verified = [] |
|
|
| for item in items: |
| name = item.get("name", "") |
| |
| clean_name = re.sub(r"\s*\(.*?\)\s*$", "", name).strip() |
| if not clean_name: |
| verified.append(item) |
| continue |
|
|
| |
| llm_lat = item.get("latitude") |
| llm_lon = item.get("longitude") |
| if llm_lat is not None and llm_lon is not None and city_center: |
| try: |
| f_lat = float(llm_lat) |
| f_lon = float(llm_lon) |
| except (ValueError, TypeError): |
| f_lat, f_lon = 0, 0 |
| if f_lat != 0 and f_lon != 0: |
| dist = _haversine_km(city_center[0], city_center[1], f_lat, f_lon) |
| if dist <= MAX_CITY_DIST_KM: |
| |
| verified.append(item) |
| continue |
|
|
| |
| |
| query = f"{clean_name}, {city}" |
| result1, _ = _nominatim_search_cached(query) |
|
|
| n_lat, n_lon, display_name = None, None, "" |
|
|
| if result1: |
| try: |
| n_lat = float(result1["lat"]) |
| n_lon = float(result1["lon"]) |
| display_name = (result1.get("display_name", "") or "").lower() |
| except (KeyError, ValueError, IndexError): |
| pass |
|
|
| if n_lat is not None: |
| |
| city_lower = city.lower() |
| city_words = set(city_lower.split()) |
| mentions_city = any(w in display_name for w in city_words) |
| |
| |
| clean_lower = clean_name.lower() |
| attraction_words = set(re.sub(r"[()\-_,]", " ", clean_lower).split()) |
| name_in_display = any(w in display_name for w in attraction_words if len(w) > 3) |
| |
| if city_center: |
| dist = _haversine_km(city_center[0], city_center[1], n_lat, n_lon) |
| if dist <= MAX_CITY_DIST_KM and mentions_city and name_in_display: |
| item["latitude"] = n_lat |
| item["longitude"] = n_lon |
| verified.append(item) |
| continue |
| elif dist <= MAX_CITY_DIST_KM and not (mentions_city and name_in_display): |
| pass |
| else: |
| continue |
| else: |
| continue |
| |
|
|
| |
| clean_name_no_paren = re.sub(r"\s*\(.*?\)\s*$", "", name).strip() |
| query2 = clean_name_no_paren |
| result2, _ = _nominatim_search_cached(query2) |
|
|
| n_lat2, n_lon2, display_name2 = None, None, "" |
| if result2: |
| try: |
| n_lat2 = float(result2["lat"]) |
| n_lon2 = float(result2["lon"]) |
| display_name2 = (result2.get("display_name", "") or "").lower() |
| except (KeyError, ValueError, IndexError): |
| pass |
|
|
| if n_lat2 is not None and city_center: |
| |
| city_lower = city.lower() |
| city_words = set(city_lower.split()) |
| mentions_city = any(w in display_name2 for w in city_words) |
| |
| |
| clean_lower = clean_name.lower() |
| attraction_words = set(re.sub(r"[()\-_,]", " ", clean_lower).split()) |
| name_in_display = any(w in display_name2 for w in attraction_words if len(w) > 3) |
| |
| dist = _haversine_km(city_center[0], city_center[1], n_lat2, n_lon2) |
| |
| if dist <= MAX_CITY_DIST_KM and mentions_city and name_in_display: |
| |
| item["latitude"] = n_lat2 |
| item["longitude"] = n_lon2 |
| verified.append(item) |
| continue |
| else: |
| |
| continue |
| else: |
| |
| try: |
| lat = float(item.get("latitude", 0)) |
| lon = float(item.get("longitude", 0)) |
| except (ValueError, TypeError): |
| lat, lon = 0, 0 |
| if lat == 0 and lon == 0 or not city_center: |
| verified.append(item) |
| else: |
| dist = _haversine_km(city_center[0], city_center[1], lat, lon) |
| if dist <= MAX_CITY_DIST_KM: |
| verified.append(item) |
|
|
| return verified |
|
|
|
|
| def _get_providers() -> list[_Provider]: |
| """Return ordered list of providers (fastest first, then fallbacks). |
| |
| Reads provider configs from environment variables. Each provider must have |
| its own API key, base URL, and model. Providers without an API key are |
| skipped so you can enable/disable them by setting/clearing env vars. |
| """ |
| providers: list[_Provider] = [] |
|
|
| |
| or_key = os.environ.get("OPENROUTER_API_KEY", "") |
| if or_key: |
| providers.append(_Provider( |
| name="openrouter-deepseek", |
| api_key=or_key, |
| base_url=os.environ.get("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1"), |
| model=os.environ.get("OPENROUTER_MODEL", "deepseek/deepseek-v4-flash:free"), |
| )) |
|
|
| |
| ollama_key = os.environ.get("OLLAMA_API_KEY", "") |
| if ollama_key: |
| providers.append(_Provider( |
| name="ollama-cloud", |
| api_key=ollama_key, |
| base_url=os.environ.get("OLLAMA_BASE_URL", "https://ollama.com/v1"), |
| model=os.environ.get("OLLAMA_MODEL", "deepseek-v4-flash:cloud"), |
| )) |
|
|
| |
| if or_key: |
| providers.append(_Provider( |
| name="openrouter-gemma", |
| api_key=or_key, |
| base_url=os.environ.get("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1"), |
| model="google/gemma-4-26b-a4b-it:free", |
| )) |
|
|
| |
| gemini_key = os.environ.get("GEMINI_API_KEY", "") |
| if gemini_key: |
| providers.append(_Provider( |
| name="gemini", |
| api_key=gemini_key, |
| base_url=os.environ.get("GEMINI_BASE_URL", "https://generativelanguage.googleapis.com/v1beta/openai/"), |
| model=os.environ.get("GEMINI_MODEL", "gemini-2.5-flash"), |
| )) |
|
|
| return providers |
|
|
|
|
| def _get_providers_randomized() -> list[_Provider]: |
| """Same as _get_providers but randomly orders the two DeepSeek V4 Flash |
| providers (OpenRouter and Ollama Cloud) so load is distributed and rate |
| limits are less likely to be hit on either provider.""" |
| providers = _get_providers() |
| |
| if len(providers) >= 2 and all(p.name in ("openrouter-deepseek", "ollama-cloud") for p in providers[:2]): |
| import random |
| p0, p1 = providers[0], providers[1] |
| if random.random() < 0.5: |
| providers[0], providers[1] = p1, p0 |
| return providers |
|
|
|
|
| def _parse_json_response(raw: str) -> list[dict] | None: |
| """Robustly extract JSON array from LLM output. |
| Returns None if parsing fails entirely (caller should show st.error).""" |
| text = raw.strip() |
| text = re.sub(r"^```(?:json)?\s*\n?", "", text) |
| text = re.sub(r"\n?```\s*$", "", text) |
| text = text.strip() |
|
|
| try: |
| parsed = json.loads(text) |
| if isinstance(parsed, list): |
| return parsed |
| if isinstance(parsed, dict): |
| return [parsed] |
| except json.JSONDecodeError: |
| pass |
|
|
| start = text.find("[") |
| end = text.rfind("]") |
| if start != -1 and end > start: |
| candidate = text[start:end + 1] |
| try: |
| parsed = json.loads(candidate) |
| if isinstance(parsed, list): |
| return parsed |
| except json.JSONDecodeError: |
| pass |
| |
| truncated = text[start:] |
| |
| truncated = re.sub(r'[,\s]*"[^"]*":\s*"[^"]*$', '', truncated) |
| for closing in ['}]}', '}]', '}', ']']: |
| attempt = truncated + closing |
| try: |
| parsed = json.loads(attempt) |
| if isinstance(parsed, list) and len(parsed) > 0: |
| return parsed |
| except json.JSONDecodeError: |
| continue |
|
|
| pattern = re.compile(r"\[[\s\S]*\](?=\s*$|\s*```)", re.MULTILINE) |
| matches = pattern.findall(text) |
| for match in reversed(matches): |
| try: |
| parsed = json.loads(match) |
| if isinstance(parsed, list): |
| return parsed |
| except json.JSONDecodeError: |
| continue |
|
|
| return None |
|
|
|
|
|
|
| def _verify_with_model(items: list[dict], city: str, providers: list[_Provider]) -> list[dict]: |
| """Use a fallback provider to verify which attractions are actually in the target city. |
| The LLM sometimes lists attractions from other cities. Nominatim can catch |
| most of these, but this adds a second verification layer. |
| Returns only items confirmed to be in the target city. |
| """ |
| if not items or len(providers) < 2: |
| return items |
|
|
| |
| verifier = providers[1] if len(providers) >= 2 else providers[0] |
|
|
| names = [item.get("name", "") for item in items] |
| names_str = "\n".join(f"{i+1}. {name}" for i, name in enumerate(names)) |
|
|
| prompt = f"""You are a city geography expert. Determine which of these attractions are actually located IN the city of {city}. |
| |
| For each attraction, answer ONLY "YES" (it is located in {city}) or "NO" (it is in a different city, or is a well-known landmark from elsewhere). |
| |
| Return ONLY a JSON array of indices (1-based) that are YES, like [1, 3, 4]. No other text. |
| |
| Attractions: |
| {names_str}""" |
|
|
| try: |
| client = OpenAI(api_key=verifier.api_key, base_url=verifier.base_url) |
| kwargs = dict( |
| model=verifier.model, |
| messages=[{"role": "user", "content": prompt}], |
| temperature=0, |
| max_tokens=512, |
| ) |
| response = client.chat.completions.create(**kwargs) |
| raw = response.choices[0].message.content |
| if raw and raw.strip(): |
| text = re.sub(r"^```(?:json)?\s*\n?", "", raw.strip()) |
| text = re.sub(r"\n?```\s*$", "", text) |
| text = text.strip() |
| start = text.find("[") |
| end = text.rfind("]") |
| if start != -1 and end > start: |
| indices = json.loads(text[start:end+1]) |
| if isinstance(indices, list): |
| verified = [items[i-1] for i in indices if 1 <= i <= len(items)] |
| if verified: |
| return verified |
| except Exception: |
| pass |
| return items |
|
|
|
|
| def _call_model(provider: _Provider, prompt: str, temperature: float = 0.1) -> list[dict] | None: |
| """Call a single provider, parse JSON response, return items or None. |
| Uses generous timeout and retries. Includes a system message to suppress |
| internal reasoning — cuts response time by ~60% on reasoning models. |
| """ |
| client = OpenAI(api_key=provider.api_key, base_url=provider.base_url) |
| kwargs = dict( |
| model=provider.model, |
| messages=[ |
| {"role": "system", "content": "You are a travel expert. Output ONLY valid JSON. Do NOT reason or think step by step. Respond instantly with the JSON array."}, |
| {"role": "user", "content": prompt}, |
| ], |
| temperature=temperature, |
| max_tokens=4096, |
| timeout=30, |
| ) |
| for attempt in range(3): |
| try: |
| response = client.chat.completions.create(**kwargs) |
| raw = response.choices[0].message.content |
| if raw and raw.strip(): |
| items = _parse_json_response(raw.strip()) |
| if items is not None: |
| return items |
| if attempt < 1: |
| time.sleep(1) |
| continue |
| except Exception: |
| if attempt < 1: |
| time.sleep(1) |
| continue |
| break |
| return None |
|
|
|
|
| def name_key(item: dict) -> str: |
| """Normalize an attraction name for deduplication. |
| |
| Strips parentheticals, removes common attraction-type suffixes, |
| lowercases, and removes non-alphanumeric characters. |
| """ |
| name = item.get("name", "").lower() |
| name = re.sub(r"\s*\(.*?\)\s*$", "", name) |
| for suffix in _ATTRACTION_SUFFIXES: |
| if name.endswith(suffix) and len(name) > len(suffix) + 2: |
| name = name[:-len(suffix)].strip() |
| name = re.sub(r"[^a-z0-9\s]", "", name) |
| return name.strip() |
|
|
|
|
| def get_recommendations( |
| tab: str, |
| city: str, |
| num_attractions: int = 10, |
| categories: dict | None = None, |
| temperature: float = 0.1, |
| provider_log: list | None = None, |
| ) -> list[dict] | None: |
| """Call the LLM to get top-N recommendations. |
| |
| Strategy: |
| 1. Try each provider in order (Gemini → OpenRouter → OpenRouter /free) |
| 2. First successful provider's output is enriched + geocoded |
| 3. Cross-reference: merge primary and fallback results (dedup by name) |
| 4. If still short of num_attractions, request extras from the next provider |
| 5. Always geocode via Nominatim to drop wrong-city entries |
| """ |
| prompt_template = PROMPT_MAP[tab] |
|
|
| |
| category_prompt = "" |
| if categories: |
| enabled = [cat for cat, on in categories.items() if on] |
| if enabled: |
| lines = [CATEGORY_GUIDANCE[cat].format(city=city) for cat in enabled if cat in CATEGORY_GUIDANCE] |
| if lines: |
| category_prompt = lines[0] |
|
|
| |
| request_count = num_attractions + 4 |
| prompt = prompt_template.format( |
| category_prompt=category_prompt, |
| num_attractions=request_count, |
| ) |
| prompt += "\n\nIMPORTANT: Do NOT include any politically controversial attractions, war museums, or memorials that might be offensive to some visitors. Focus on universally enjoyed tourist attractions." |
|
|
| providers = _get_providers_randomized() |
| if not providers: |
| return None |
|
|
| |
| primary_items: list[dict] = [] |
| fallback_items: list[dict] = [] |
|
|
| for i, provider in enumerate(providers): |
| t0 = time.time() |
| items = _call_model(provider, prompt, temperature=temperature) |
| elapsed = time.time() - t0 |
| if provider_log is not None: |
| provider_log.append({ |
| "provider": provider.name, |
| "model": provider.model, |
| "status": "success" if items else "failed", |
| "elapsed": round(elapsed, 1), |
| "items": len(items) if items else 0, |
| }) |
| if items: |
| |
| with concurrent.futures.ThreadPoolExecutor(max_workers=2) as pool: |
| ef = pool.submit(_enrich_with_images, items, city=city) |
| vf = pool.submit(_verify_coordinates, items, city) |
| concurrent.futures.wait([ef, vf]) |
| items = vf.result() |
| if items: |
| if i == 0: |
| primary_items = items |
| else: |
| fallback_items = items |
| break |
|
|
| |
| if not primary_items and not fallback_items: |
| for provider in providers: |
| t0 = time.time() |
| items = _call_model(provider, prompt, temperature=temperature) |
| elapsed = time.time() - t0 |
| if provider_log is not None: |
| provider_log.append({ |
| "provider": provider.name, |
| "model": provider.model, |
| "status": "success" if items else "failed", |
| "elapsed": round(elapsed, 1), |
| "items": len(items) if items else 0, |
| "retry": True, |
| }) |
| if items: |
| with concurrent.futures.ThreadPoolExecutor(max_workers=2) as pool: |
| ef = pool.submit(_enrich_with_images, items, city=city) |
| vf = pool.submit(_verify_coordinates, items, city) |
| concurrent.futures.wait([ef, vf]) |
| combined = vf.result() |
| if combined: |
| primary_items = combined |
| break |
| if not primary_items: |
| return None |
|
|
| |
| seen_names = set() |
| merged = [] |
|
|
| for item in primary_items + fallback_items: |
| key = name_key(item) |
| if key not in seen_names: |
| seen_names.add(key) |
| merged.append(item) |
|
|
| |
| if len(merged) > request_count and len(providers) > 1: |
| merged = _verify_with_model(merged, city, providers) |
|
|
| |
| _CONTROVERSIAL_PLACES = {"yasukuni", "yasukuni shrine"} |
| merged = [ |
| item for item in merged |
| if not any(bad in item.get("name", "").lower() for bad in _CONTROVERSIAL_PLACES) |
| ] |
|
|
| for item in merged: |
| name = item.get("name", "") |
| for sep in (" & ", " and ", " / ", "/", " &"): |
| if sep in name: |
| parts = name.split(sep, 1) |
| item["name"] = parts[0].strip() |
| break |
|
|
| for item in merged: |
| name = item.get("name", "") |
| name = re.sub(r"\s*\(.*?\)\s*$", "", name).strip() |
| name = re.sub(r",\s*[A-Za-z].*$", "", name).strip() |
| name = name.strip() |
| if name: |
| item["name"] = name |
|
|
| |
| shortfall = num_attractions - len(merged) |
| if shortfall > 0 and num_attractions <= 9: |
| extras_prompt = prompt_template.format( |
| category_prompt=category_prompt, |
| num_attractions=shortfall + 3, |
| ) |
| extras_prompt += "\n\nIMPORTANT: Do NOT include any politically controversial attractions, war museums, or memorials that might be offensive to some visitors. Focus on universally enjoyed tourist attractions." |
| existing_names = {name_key(item) for item in merged} |
| extras_prompt += f"\n\nIMPORTANT: Do NOT include any of these already-listed attractions:\n{chr(10).join(f'- {n}' for n in list(existing_names)[:20])}" |
| extras_prompt += "\n\nOnly return attractions NOT listed above." |
|
|
| |
| extras_provider = providers[1] if len(providers) > 1 else providers[0] |
| extras_items = _call_model(extras_provider, extras_prompt, temperature=temperature) |
|
|
| if not extras_items and len(providers) > 1: |
| extras_items = _call_model(providers[0], extras_prompt, temperature=temperature) |
|
|
| if extras_items: |
| with concurrent.futures.ThreadPoolExecutor(max_workers=2) as pool: |
| ef = pool.submit(_enrich_with_images, extras_items, city=city) |
| vf = pool.submit(_verify_coordinates, extras_items, city) |
| concurrent.futures.wait([ef, vf]) |
| extras_items = vf.result() |
| for item in extras_items: |
| key = name_key(item) |
| if key not in seen_names and key: |
| seen_names.add(key) |
| merged.append(item) |
|
|
| |
| return merged[:num_attractions] |
|
|
|
|
| def translate_items(items: list[dict], second_language: str, tab: str) -> list[dict]: |
| """Call the LLM to translate recommendation items into a second language. |
| Tries each provider in order until one succeeds. |
| """ |
| if not second_language or not items: |
| return items |
|
|
| providers = _get_providers_randomized() |
| if not providers: |
| return items |
|
|
| |
| items_for_llm = [ |
| {k: v for k, v in item.items() if k != "image_url"} |
| for item in items |
| ] |
| items_json = json.dumps(items_for_llm, ensure_ascii=False, indent=2) |
|
|
| sample = items[0] if items else {} |
| fields = [k for k in ("name", "short_description", "description", "tip") if k in sample] |
| translation_keys = ", ".join(f'"{f}_local": translate the value of "{f}" into {second_language}' for f in fields) |
| trans_example = "\n".join(f" // {f} → {f}_local (translated)" for f in fields[:2]) |
|
|
| prompt = f"""You are a professional translator. Translate the following JSON array of travel recommendations into {second_language}. |
| |
| CRITICAL: If the target language is Traditional Chinese, you MUST use Traditional Chinese characters (繁體字), NOT Simplified Chinese (简体字). Use characters like 的, 們, 國, 會, 後, 發, 時 instead of 的, 们, 国, 会, 后, 发, 时. |
| |
| For EACH object in the input array, you MUST add these new keys: |
| {translation_keys} |
| |
| {trans_example} |
| |
| IMPORTANT: The "_local" keys are NEW keys alongside the original ones. Do NOT remove or change the original English keys. Every object MUST have {", ".join(f'"{f}_local"' for f in fields)} added. |
| |
| Input: |
| {items_json} |
| |
| Return ONLY the complete JSON array with ALL original English keys AND ALL new "_local" translation keys. No markdown fences, no extra text.""" |
|
|
| last_error = None |
| for provider in providers: |
| client = OpenAI(api_key=provider.api_key, base_url=provider.base_url) |
| kwargs = dict( |
| model=provider.model, |
| messages=[ |
| {"role": "system", "content": "You are a professional translator. Output ONLY valid JSON. Do NOT reason or think step by step."}, |
| {"role": "user", "content": prompt}, |
| ], |
| temperature=0, |
| max_tokens=8192, |
| ) |
| if provider.name == "ollama-cloud": |
| kwargs["extra_body"] = {"think": False} |
| for attempt in range(3): |
| try: |
| response = client.chat.completions.create(**kwargs) |
| raw = response.choices[0].message.content |
| if raw and raw.strip(): |
| translated = _parse_json_response(raw.strip()) |
| if isinstance(translated, list): |
| if len(translated) != len(items): |
| break |
| merged = [] |
| for orig, trans in zip(items, translated): |
| item = dict(orig) |
| for k, v in trans.items(): |
| if k.endswith("_local"): |
| item[k] = v |
| merged.append(item) |
| |
| has_local = any("name_local" in it for it in merged) |
| if not has_local and attempt < 2: |
| |
| |
| warning = "\n\nWARNING: Your previous response did NOT include any '_local' fields. You MUST add them. Every object must have " + ", ".join(f'"{f}_local"' for f in fields) + ". No exceptions." |
| augmented_prompt = prompt + warning |
| kwargs["messages"] = [ |
| {"role": "system", "content": "You are a professional translator. Output ONLY valid JSON. Do NOT reason or think step by step."}, |
| {"role": "user", "content": augmented_prompt}, |
| ] |
| time.sleep(1) |
| continue |
| return merged |
| if attempt < 1: |
| time.sleep(1) |
| continue |
| break |
| except Exception as e: |
| last_error = e |
| if attempt < 1: |
| time.sleep(1) |
| continue |
| break |
|
|
| return items |
|
|
|
|
| |
|
|
| def clear_llm_caches() -> None: |
| """Clear LLM result and translation caches only. |
| Does NOT clear image or geocode caches (those are stable per attraction). |
| Call this when the user clicks Clear in the UI. |
| """ |
| _LLM_CACHE.clear() |
| _TRANSLATION_CACHE.clear() |
| _save_llm_cache() |
| _save_translation_cache() |
|
|
|
|
| def get_recommendations_cached( |
| city: str, |
| num_attractions: int = 10, |
| categories: dict | None = None, |
| temperature: float = 0, |
| provider_log: list | None = None, |
| ) -> list[dict] | None: |
| """Cached version — avoids repeat LLM calls across different num choices. |
| |
| Cache key is (city, cat_hash) only — num_attractions is NOT part of the |
| key so that changing the recommendation count reuses the same cache entry. |
| Always requests 19 items internally (the max for any num choice: 15+4). |
| Trims the cached result to the requested count on return. |
| |
| When temperature>0, bypasses cache entirely for creative/refreshed results. |
| When temperature=0 (default), uses cache for deterministic results. |
| """ |
| cat_hash = json.dumps(categories or {}, sort_keys=True) |
| key = (city, cat_hash) |
|
|
| |
| if temperature > 0: |
| result = get_recommendations( |
| tab="attractions", city=city, num_attractions=19, |
| categories=categories, temperature=temperature, |
| provider_log=provider_log, |
| ) |
| if result is not None: |
| return result[:num_attractions] |
| return None |
|
|
| |
| if key in _LLM_CACHE: |
| cached = _LLM_CACHE[key] |
| if cached is not None: |
| return cached[:num_attractions] |
| |
| |
| |
| result = get_recommendations( |
| tab="attractions", city=city, num_attractions=19, |
| categories=categories, temperature=0, |
| provider_log=provider_log, |
| ) |
| if result is not None: |
| _LLM_CACHE[key] = result |
| _save_llm_cache() |
| return result[:num_attractions] |
| return None |
|
|
|
|
| |
| _DEEP_TR_LANG_MAP = { |
| "Korean": "ko", |
| "Japanese": "ja", |
| "Traditional Chinese": "zh-TW", |
| "Simplified Chinese": "zh-CN", |
| "Chinese Simplified": "zh-CN", |
| "French": "fr", |
| "Spanish": "es", |
| "German": "de", |
| "Italian": "it", |
| "Portuguese": "pt", |
| "Arabic": "ar", |
| "Russian": "ru", |
| "Dutch": "nl", |
| "Thai": "th", |
| "Vietnamese": "vi", |
| "Turkish": "tr", |
| "Greek": "el", |
| "Polish": "pl", |
| "Swedish": "sv", |
| "Danish": "da", |
| "Finnish": "fi", |
| "Norwegian": "no", |
| "Czech": "cs", |
| "Romanian": "ro", |
| "Hungarian": "hu", |
| "Hebrew": "he", |
| "Hindi": "hi", |
| "Indonesian": "id", |
| "Malay": "ms", |
| } |
|
|
| _TRANSLATION_FIELDS = ("name", "short_description", "description", "tip") |
|
|
|
|
| def _translate_items_deep(items: list[dict], second_language: str) -> list[dict] | None: |
| """Translate items using deep-translator (Google Translate scraper, free). |
| |
| Much faster and cheaper than LLM-based translation. Falls back cleanly |
| (returns None) if deep-translator is not installed or the language isn't |
| supported, so callers can fall through to the LLM path. |
| |
| Produces the same _local field format as the LLM translator so the rest |
| of the app is unaware of which backend was used. |
| |
| Uses parallel requests internally (ThreadPoolExecutor) to translate all |
| text fields across all items concurrently — ~50x faster than sequential. |
| """ |
| lang_code = _DEEP_TR_LANG_MAP.get(second_language) |
| if not lang_code: |
| return None |
|
|
| try: |
| from deep_translator import GoogleTranslator |
| translator = GoogleTranslator(source="en", target=lang_code) |
| except Exception: |
| return None |
|
|
| |
| texts_to_translate: list[str] = [] |
| positions: list[tuple[int, str]] = [] |
|
|
| for i, item in enumerate(items): |
| for field in _TRANSLATION_FIELDS: |
| text = item.get(field, "") |
| if text and isinstance(text, str) and text.strip(): |
| texts_to_translate.append(text.strip()) |
| positions.append((i, field)) |
|
|
| if not texts_to_translate: |
| return items |
|
|
| |
| |
| import concurrent.futures |
|
|
| def _do_translate(text: str) -> str: |
| try: |
| from deep_translator import GoogleTranslator |
| t = GoogleTranslator(source="en", target=lang_code) |
| return t.translate(text) or "" |
| except Exception: |
| return "" |
|
|
| translated_texts: list[str] = [] |
| try: |
| with concurrent.futures.ThreadPoolExecutor(max_workers=15) as pool: |
| futures = [pool.submit(_do_translate, t) for t in texts_to_translate] |
| |
| for f in futures: |
| translated_texts.append(f.result()) |
| except Exception: |
| return None |
|
|
| if len(translated_texts) != len(texts_to_translate): |
| return None |
|
|
| |
| result: list[dict] = [dict(item) for item in items] |
| for (i, field), translated in zip(positions, translated_texts): |
| result[i][field + "_local"] = translated if translated else result[i].get(field, "") |
|
|
| |
| has_local = any(any(k.endswith("_local") for k in it) for it in result) |
| return result if has_local else None |
|
|
|
|
| def translate_items_cached(items: list[dict], second_language: str, city: str, categories: dict | None = None) -> list[dict]: |
| """Cached version of translate_items — avoids repeat LLM calls. |
| Cache key uses (city, cat_hash, language) — deterministic from search |
| params alone, no content-dependency. Survives image enrichment changes |
| and re-orders. |
| |
| Uses deep-translator (Google Translate, free) as the primary path on |
| cache miss, falling back to the LLM if deep-translator is unavailable |
| or the language isn't supported. |
| """ |
| cat_hash = json.dumps(categories or {}, sort_keys=True) |
| key = (city, cat_hash, second_language) |
| if key in _TRANSLATION_CACHE: |
| return _TRANSLATION_CACHE[key] |
|
|
| |
| result = _translate_items_deep(items, second_language) |
|
|
| |
| if result is None: |
| result = translate_items(items, second_language, "attractions") |
|
|
| _TRANSLATION_CACHE[key] = result |
| _save_translation_cache() |
| return result |