roamify / scripts /prewarm_remaining.py
jofaichow's picture
cache prewarm + concurrency improvements
7b2fc2d
#!/usr/bin/env python3
"""
Pre-warm LLM cache for remaining uncached cities.
Processes combos concurrently (2 workers) to maximize throughput while
respecting Nominatim's 1 req/s rate limit via a thread-safe limiter.
Each worker randomly picks between OpenRouter DeepSeek and Ollama Cloud
DeepSeek as the primary provider (via _get_providers_randomized), splitting
the workload and reducing rate-limit pressure on either provider.
Usage:
cd roamify && python scripts/prewarm_remaining.py
"""
import json
import os
import random
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
from dotenv import load_dotenv
load_dotenv(dotenv_path=os.path.join(os.path.dirname(__file__), "..", ".env"), override=True)
from services.recommender import (
get_recommendations_cached,
_LLM_CACHE,
_save_llm_cache,
_save_image_cache,
_save_geocode_cache,
)
CATEGORY_NAMES = ["Landmark", "Culture", "Nature", "Gems", "Photo", "Food", "Shopping"]
# 15 cities still uncached (updated as more get prewarmed)
UNCATEGORIZED_CITIES = [
"Montreal", "Moscow", "Osaka", "Oslo", "Reykjavik",
"Santiago", "Shanghai", "Stockholm", "Taipei", "Tel Aviv",
"Toronto", "Vancouver", "Venice", "Warsaw", "Washington",
]
_COMBO_STATS = {"success": 0, "skipped": 0, "fail": 0, "total": 0}
_COMBO_LOCK = threading.Lock()
def process_combo(city: str, cat_name: str, combo_idx: int, total: int) -> None:
"""Process a single city/category combo and update stats."""
categories = {name: (name == cat_name) for name in CATEGORY_NAMES}
cat_hash = json.dumps(categories, sort_keys=True)
if (city, cat_hash) in _LLM_CACHE:
with _COMBO_LOCK:
_COMBO_STATS["skipped"] += 1
print(f" [{combo_idx:>3}/{total}] ⏭️ {city} / {cat_name} — already cached", flush=True)
return
print(f" [{combo_idx:>3}/{total}] 🔍 {city} / {cat_name}...", end=" ", flush=True)
start = time.time()
try:
result = get_recommendations_cached(
city=city,
num_attractions=6,
categories=categories,
temperature=0,
)
elapsed = time.time() - start
if result:
items = len(result)
with _COMBO_LOCK:
_COMBO_STATS["success"] += 1
print(f"✅ {items} items in {elapsed:.1f}s", flush=True)
else:
with _COMBO_LOCK:
_COMBO_STATS["fail"] += 1
print(f"❌ returned None in {elapsed:.1f}s", flush=True)
except Exception as e:
elapsed = time.time() - start
with _COMBO_LOCK:
_COMBO_STATS["fail"] += 1
print(f"❌ error after {elapsed:.1f}s: {e}", flush=True)
def prewarm():
"""Run all combos concurrently with 2 workers."""
total_combos = len(UNCATEGORIZED_CITIES) * len(CATEGORY_NAMES)
_COMBO_STATS["total"] = total_combos
llm_before = len(_LLM_CACHE)
print(f"Pre-warming caches: {len(UNCATEGORIZED_CITIES)} cities × {len(CATEGORY_NAMES)} categories = {total_combos} combos")
print(f" Workers: 2 (concurrent) — each uses random DeepSeek provider")
print(f" Existing LLM cache entries: {llm_before}")
print()
# Build all combos, shuffle for load distribution across workers
all_combos = []
idx = 0
for city in UNCATEGORIZED_CITIES:
for cat_name in CATEGORY_NAMES:
idx += 1
all_combos.append((city, cat_name, idx))
random.shuffle(all_combos)
# Re-assign sequential indices after shuffle (for display only)
for i, (city, cat_name, _) in enumerate(all_combos):
all_combos[i] = (city, cat_name, i + 1)
with ThreadPoolExecutor(max_workers=2) as pool:
futures = [
pool.submit(process_combo, city, cat_name, idx, total_combos)
for city, cat_name, idx in all_combos
]
# Process results as they complete
for future in as_completed(futures):
try:
future.result()
except Exception:
pass # Errors are already logged in process_combo
# Summary
llm_new = len(_LLM_CACHE) - llm_before
print()
print("═" * 55)
print("Pre-warm complete!")
print(f" Combos: {_COMBO_STATS['success']} succeeded, {_COMBO_STATS['skipped']} skipped, {_COMBO_STATS['fail']} failed")
print(f" New LLM cache entries: {llm_new} (total: {len(_LLM_CACHE)})")
_save_llm_cache()
_save_image_cache()
_save_geocode_cache()
print()
print("All caches saved to disk ✅")
if __name__ == "__main__":
prewarm()