jofaichow commited on
Commit
7b2fc2d
·
1 Parent(s): 584bf94

cache prewarm + concurrency improvements

Browse files

- Prewarm Osaka (7 cats, 14 translations) — 47/61 cities cached
- Thread-safe Nominatim rate limiter for concurrent workers
- Provider randomization splits load across OpenRouter + Ollama Cloud
- Increased LLM timeout 60→120s to reduce retry cascades
- Concurrent 2-worker prewarm script

.geocode_cache.json CHANGED
The diff for this file is too large to render. See raw diff
 
.image_cache.json CHANGED
The diff for this file is too large to render. See raw diff
 
.llm_cache.json CHANGED
The diff for this file is too large to render. See raw diff
 
.translation_cache.json CHANGED
The diff for this file is too large to render. See raw diff
 
scripts/prewarm_remaining.py CHANGED
@@ -1,20 +1,25 @@
1
  #!/usr/bin/env python3
2
  """
3
- Pre-warm LLM cache for all 33 cities that have zero cache entries.
4
 
5
- Runs all 7 categories for each city. Skips translation — only populates:
6
- - .llm_cache.json → instant repeat lookups
7
- - .image_cache.json → instant image loads
8
- - .geocode_cache.json instant geocoding
 
 
9
 
10
  Usage:
11
  cd roamify && python scripts/prewarm_remaining.py
12
  """
13
 
 
14
  import os
 
15
  import sys
 
16
  import time
17
- import json
18
 
19
  sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
20
 
@@ -24,8 +29,6 @@ load_dotenv(dotenv_path=os.path.join(os.path.dirname(__file__), "..", ".env"), o
24
  from services.recommender import (
25
  get_recommendations_cached,
26
  _LLM_CACHE,
27
- _IMAGE_CACHE,
28
- _GEOCODE_CACHE,
29
  _save_llm_cache,
30
  _save_image_cache,
31
  _save_geocode_cache,
@@ -33,78 +36,99 @@ from services.recommender import (
33
 
34
  CATEGORY_NAMES = ["Landmark", "Culture", "Nature", "Gems", "Photo", "Food", "Shopping"]
35
 
 
36
  UNCATEGORIZED_CITIES = [
37
- "Beijing", "Buenos Aires", "Cairo", "Chicago", "Copenhagen",
38
- "Delhi", "Dublin", "Edinburgh", "Florence", "Hanoi",
39
- "Honolulu", "Kuala Lumpur", "Las Vegas", "Los Angeles", "Madrid",
40
- "Melbourne", "Miami", "Milan", "Montreal", "Moscow",
41
- "Osaka", "Oslo", "Reykjavik", "Santiago", "Shanghai",
42
- "Stockholm", "Taipei", "Tel Aviv", "Toronto", "Vancouver",
43
- "Venice", "Warsaw", "Washington",
44
  ]
45
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46
 
47
  def prewarm():
 
48
  total_combos = len(UNCATEGORIZED_CITIES) * len(CATEGORY_NAMES)
49
- success = 0
50
- skipped = 0
51
- fail = 0
52
 
53
  llm_before = len(_LLM_CACHE)
54
 
55
  print(f"Pre-warming caches: {len(UNCATEGORIZED_CITIES)} cities × {len(CATEGORY_NAMES)} categories = {total_combos} combos")
 
56
  print(f" Existing LLM cache entries: {llm_before}")
57
  print()
58
 
59
- combo_idx = 0
 
 
60
  for city in UNCATEGORIZED_CITIES:
61
  for cat_name in CATEGORY_NAMES:
62
- combo_idx += 1
63
- categories = {name: (name == cat_name) for name in CATEGORY_NAMES}
64
- cat_hash = json.dumps(categories, sort_keys=True)
65
-
66
- if (city, cat_hash) in _LLM_CACHE:
67
- print(f" [{combo_idx:>3}/{total_combos}] ⏭️ {city} / {cat_name} already cached")
68
- skipped += 1
69
- continue
70
-
71
- print(f" [{combo_idx:>3}/{total_combos}] 🔍 {city} / {cat_name}...", end=" ", flush=True)
72
- start = time.time()
 
 
 
 
73
  try:
74
- result = get_recommendations_cached(
75
- city=city,
76
- num_attractions=6,
77
- categories=categories,
78
- temperature=0,
79
- )
80
- elapsed = time.time() - start
81
- if result:
82
- items = len(result)
83
- print(f"✅ {items} items in {elapsed:.1f}s")
84
- success += 1
85
- else:
86
- print(f"❌ returned None in {elapsed:.1f}s")
87
- fail += 1
88
- except Exception as e:
89
- elapsed = time.time() - start
90
- print(f"❌ error after {elapsed:.1f}s: {e}")
91
- fail += 1
92
-
93
- # Nominatim rate-limit pause
94
- time.sleep(1.5)
95
 
96
  # Summary
97
  llm_new = len(_LLM_CACHE) - llm_before
98
- image_new = len(_IMAGE_CACHE)
99
- geo_new = len(_GEOCODE_CACHE)
100
 
101
  print()
102
  print("═" * 55)
103
  print("Pre-warm complete!")
104
- print(f" Combos: {success} succeeded, {skipped} skipped, {fail} failed")
105
  print(f" New LLM cache entries: {llm_new} (total: {len(_LLM_CACHE)})")
106
- print(f" Image cache entries: {image_new}")
107
- print(f" Geocode cache entries: {geo_new}")
108
 
109
  _save_llm_cache()
110
  _save_image_cache()
 
1
  #!/usr/bin/env python3
2
  """
3
+ Pre-warm LLM cache for remaining uncached cities.
4
 
5
+ Processes combos concurrently (2 workers) to maximize throughput while
6
+ respecting Nominatim's 1 req/s rate limit via a thread-safe limiter.
7
+
8
+ Each worker randomly picks between OpenRouter DeepSeek and Ollama Cloud
9
+ DeepSeek as the primary provider (via _get_providers_randomized), splitting
10
+ the workload and reducing rate-limit pressure on either provider.
11
 
12
  Usage:
13
  cd roamify && python scripts/prewarm_remaining.py
14
  """
15
 
16
+ import json
17
  import os
18
+ import random
19
  import sys
20
+ import threading
21
  import time
22
+ from concurrent.futures import ThreadPoolExecutor, as_completed
23
 
24
  sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
25
 
 
29
  from services.recommender import (
30
  get_recommendations_cached,
31
  _LLM_CACHE,
 
 
32
  _save_llm_cache,
33
  _save_image_cache,
34
  _save_geocode_cache,
 
36
 
37
  CATEGORY_NAMES = ["Landmark", "Culture", "Nature", "Gems", "Photo", "Food", "Shopping"]
38
 
39
+ # 15 cities still uncached (updated as more get prewarmed)
40
  UNCATEGORIZED_CITIES = [
41
+ "Montreal", "Moscow", "Osaka", "Oslo", "Reykjavik",
42
+ "Santiago", "Shanghai", "Stockholm", "Taipei", "Tel Aviv",
43
+ "Toronto", "Vancouver", "Venice", "Warsaw", "Washington",
 
 
 
 
44
  ]
45
 
46
+ _COMBO_STATS = {"success": 0, "skipped": 0, "fail": 0, "total": 0}
47
+ _COMBO_LOCK = threading.Lock()
48
+
49
+
50
+ def process_combo(city: str, cat_name: str, combo_idx: int, total: int) -> None:
51
+ """Process a single city/category combo and update stats."""
52
+ categories = {name: (name == cat_name) for name in CATEGORY_NAMES}
53
+ cat_hash = json.dumps(categories, sort_keys=True)
54
+
55
+ if (city, cat_hash) in _LLM_CACHE:
56
+ with _COMBO_LOCK:
57
+ _COMBO_STATS["skipped"] += 1
58
+ print(f" [{combo_idx:>3}/{total}] ⏭️ {city} / {cat_name} — already cached", flush=True)
59
+ return
60
+
61
+ print(f" [{combo_idx:>3}/{total}] 🔍 {city} / {cat_name}...", end=" ", flush=True)
62
+ start = time.time()
63
+ try:
64
+ result = get_recommendations_cached(
65
+ city=city,
66
+ num_attractions=6,
67
+ categories=categories,
68
+ temperature=0,
69
+ )
70
+ elapsed = time.time() - start
71
+ if result:
72
+ items = len(result)
73
+ with _COMBO_LOCK:
74
+ _COMBO_STATS["success"] += 1
75
+ print(f"✅ {items} items in {elapsed:.1f}s", flush=True)
76
+ else:
77
+ with _COMBO_LOCK:
78
+ _COMBO_STATS["fail"] += 1
79
+ print(f"❌ returned None in {elapsed:.1f}s", flush=True)
80
+ except Exception as e:
81
+ elapsed = time.time() - start
82
+ with _COMBO_LOCK:
83
+ _COMBO_STATS["fail"] += 1
84
+ print(f"❌ error after {elapsed:.1f}s: {e}", flush=True)
85
+
86
 
87
  def prewarm():
88
+ """Run all combos concurrently with 2 workers."""
89
  total_combos = len(UNCATEGORIZED_CITIES) * len(CATEGORY_NAMES)
90
+ _COMBO_STATS["total"] = total_combos
 
 
91
 
92
  llm_before = len(_LLM_CACHE)
93
 
94
  print(f"Pre-warming caches: {len(UNCATEGORIZED_CITIES)} cities × {len(CATEGORY_NAMES)} categories = {total_combos} combos")
95
+ print(f" Workers: 2 (concurrent) — each uses random DeepSeek provider")
96
  print(f" Existing LLM cache entries: {llm_before}")
97
  print()
98
 
99
+ # Build all combos, shuffle for load distribution across workers
100
+ all_combos = []
101
+ idx = 0
102
  for city in UNCATEGORIZED_CITIES:
103
  for cat_name in CATEGORY_NAMES:
104
+ idx += 1
105
+ all_combos.append((city, cat_name, idx))
106
+
107
+ random.shuffle(all_combos)
108
+ # Re-assign sequential indices after shuffle (for display only)
109
+ for i, (city, cat_name, _) in enumerate(all_combos):
110
+ all_combos[i] = (city, cat_name, i + 1)
111
+
112
+ with ThreadPoolExecutor(max_workers=2) as pool:
113
+ futures = [
114
+ pool.submit(process_combo, city, cat_name, idx, total_combos)
115
+ for city, cat_name, idx in all_combos
116
+ ]
117
+ # Process results as they complete
118
+ for future in as_completed(futures):
119
  try:
120
+ future.result()
121
+ except Exception:
122
+ pass # Errors are already logged in process_combo
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
123
 
124
  # Summary
125
  llm_new = len(_LLM_CACHE) - llm_before
 
 
126
 
127
  print()
128
  print("═" * 55)
129
  print("Pre-warm complete!")
130
+ print(f" Combos: {_COMBO_STATS['success']} succeeded, {_COMBO_STATS['skipped']} skipped, {_COMBO_STATS['fail']} failed")
131
  print(f" New LLM cache entries: {llm_new} (total: {len(_LLM_CACHE)})")
 
 
132
 
133
  _save_llm_cache()
134
  _save_image_cache()
src/services/recommender.py CHANGED
@@ -109,6 +109,11 @@ def _save_image_cache() -> None:
109
  _GEOCODE_CACHE: dict[str, dict | None] = {}
110
  _load_geocode_cache() # Restore persisted cache from disk
111
 
 
 
 
 
 
112
  # Module-level cache for image enrichment results — keyed by (name, city, country) -> image URL
113
  # Never cleared, survives "Clear" clicks. Image URLs are stable per attraction.
114
  _IMAGE_CACHE: dict[tuple[str, str, str], str] = {}
@@ -644,7 +649,14 @@ def _nominatim_search_cached(query: str, timeout: int = 10) -> tuple[dict | None
644
  "q": query, "format": "json", "limit": 1, "accept-language": "en",
645
  })
646
  data = _http_get_json(url, timeout=timeout, retries=2)
647
- time.sleep(1.01) # Nominatim rate limit: 1 req/s (only on actual API calls)
 
 
 
 
 
 
 
648
  if data and isinstance(data, list) and data:
649
  _GEOCODE_CACHE[query] = data[0]
650
  _save_geocode_cache()
@@ -871,6 +883,20 @@ def _get_providers() -> list[_Provider]:
871
  return providers
872
 
873
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
874
  def _parse_json_response(raw: str) -> list[dict] | None:
875
  """Robustly extract JSON array from LLM output.
876
  Returns None if parsing fails entirely (caller should show st.error)."""
@@ -994,7 +1020,7 @@ def _call_model(provider: _Provider, prompt: str, temperature: float = 0.1) -> l
994
  ],
995
  temperature=temperature,
996
  max_tokens=4096,
997
- timeout=60,
998
  )
999
  # Ollama Cloud supports the "think" parameter natively via extra_body
1000
  if provider.name == "ollama-cloud":
@@ -1054,7 +1080,7 @@ def get_recommendations(
1054
  )
1055
  prompt += "\n\nIMPORTANT: Do NOT include any politically controversial attractions, war museums, or memorials that might be offensive to some visitors. Focus on universally enjoyed tourist attractions."
1056
 
1057
- providers = _get_providers()
1058
  if not providers:
1059
  return None
1060
 
@@ -1203,7 +1229,7 @@ def translate_items(items: list[dict], second_language: str, tab: str) -> list[d
1203
  if not second_language or not items:
1204
  return items
1205
 
1206
- providers = _get_providers()
1207
  if not providers:
1208
  return items
1209
 
 
109
  _GEOCODE_CACHE: dict[str, dict | None] = {}
110
  _load_geocode_cache() # Restore persisted cache from disk
111
 
112
+ # Thread-safe Nominatim rate limiter — ensures max 1 API call per second
113
+ # across all threads (prewarm with concurrent workers, image enrichment, etc.)
114
+ _nominatim_lock = threading.Lock()
115
+ _nominatim_last_call: float = 0.0
116
+
117
  # Module-level cache for image enrichment results — keyed by (name, city, country) -> image URL
118
  # Never cleared, survives "Clear" clicks. Image URLs are stable per attraction.
119
  _IMAGE_CACHE: dict[tuple[str, str, str], str] = {}
 
649
  "q": query, "format": "json", "limit": 1, "accept-language": "en",
650
  })
651
  data = _http_get_json(url, timeout=timeout, retries=2)
652
+ # Thread-safe Nominatim rate limit: 1 req/s (ensured across all concurrent workers)
653
+ global _nominatim_last_call
654
+ with _nominatim_lock:
655
+ now = time.time()
656
+ since_last = now - _nominatim_last_call
657
+ if since_last < 1.01:
658
+ time.sleep(1.01 - since_last)
659
+ _nominatim_last_call = time.time()
660
  if data and isinstance(data, list) and data:
661
  _GEOCODE_CACHE[query] = data[0]
662
  _save_geocode_cache()
 
883
  return providers
884
 
885
 
886
+ def _get_providers_randomized() -> list[_Provider]:
887
+ """Same as _get_providers but randomly orders the two DeepSeek V4 Flash
888
+ providers (OpenRouter and Ollama Cloud) so load is distributed and rate
889
+ limits are less likely to be hit on either provider."""
890
+ providers = _get_providers()
891
+ # Shuffle the first two DeepSeek providers if both are present
892
+ if len(providers) >= 2 and all(p.name in ("openrouter-deepseek", "ollama-cloud") for p in providers[:2]):
893
+ import random
894
+ p0, p1 = providers[0], providers[1]
895
+ if random.random() < 0.5:
896
+ providers[0], providers[1] = p1, p0
897
+ return providers
898
+
899
+
900
  def _parse_json_response(raw: str) -> list[dict] | None:
901
  """Robustly extract JSON array from LLM output.
902
  Returns None if parsing fails entirely (caller should show st.error)."""
 
1020
  ],
1021
  temperature=temperature,
1022
  max_tokens=4096,
1023
+ timeout=120,
1024
  )
1025
  # Ollama Cloud supports the "think" parameter natively via extra_body
1026
  if provider.name == "ollama-cloud":
 
1080
  )
1081
  prompt += "\n\nIMPORTANT: Do NOT include any politically controversial attractions, war museums, or memorials that might be offensive to some visitors. Focus on universally enjoyed tourist attractions."
1082
 
1083
+ providers = _get_providers_randomized()
1084
  if not providers:
1085
  return None
1086
 
 
1229
  if not second_language or not items:
1230
  return items
1231
 
1232
+ providers = _get_providers_randomized()
1233
  if not providers:
1234
  return items
1235