File size: 4,718 Bytes
584bf94 7b2fc2d 584bf94 7b2fc2d 584bf94 7b2fc2d 584bf94 7b2fc2d 584bf94 7b2fc2d 584bf94 7b2fc2d 584bf94 7b2fc2d 584bf94 7b2fc2d 584bf94 7b2fc2d 584bf94 7b2fc2d 584bf94 7b2fc2d 584bf94 7b2fc2d 584bf94 7b2fc2d 584bf94 7b2fc2d 584bf94 7b2fc2d 584bf94 7b2fc2d 584bf94 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | #!/usr/bin/env python3
"""
Pre-warm LLM cache for remaining uncached cities.
Processes combos concurrently (2 workers) to maximize throughput while
respecting Nominatim's 1 req/s rate limit via a thread-safe limiter.
Each worker randomly picks between OpenRouter DeepSeek and Ollama Cloud
DeepSeek as the primary provider (via _get_providers_randomized), splitting
the workload and reducing rate-limit pressure on either provider.
Usage:
cd roamify && python scripts/prewarm_remaining.py
"""
import json
import os
import random
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
from dotenv import load_dotenv
load_dotenv(dotenv_path=os.path.join(os.path.dirname(__file__), "..", ".env"), override=True)
from services.recommender import (
get_recommendations_cached,
_LLM_CACHE,
_save_llm_cache,
_save_image_cache,
_save_geocode_cache,
)
CATEGORY_NAMES = ["Landmark", "Culture", "Nature", "Gems", "Photo", "Food", "Shopping"]
# 15 cities still uncached (updated as more get prewarmed)
UNCATEGORIZED_CITIES = [
"Montreal", "Moscow", "Osaka", "Oslo", "Reykjavik",
"Santiago", "Shanghai", "Stockholm", "Taipei", "Tel Aviv",
"Toronto", "Vancouver", "Venice", "Warsaw", "Washington",
]
_COMBO_STATS = {"success": 0, "skipped": 0, "fail": 0, "total": 0}
_COMBO_LOCK = threading.Lock()
def process_combo(city: str, cat_name: str, combo_idx: int, total: int) -> None:
"""Process a single city/category combo and update stats."""
categories = {name: (name == cat_name) for name in CATEGORY_NAMES}
cat_hash = json.dumps(categories, sort_keys=True)
if (city, cat_hash) in _LLM_CACHE:
with _COMBO_LOCK:
_COMBO_STATS["skipped"] += 1
print(f" [{combo_idx:>3}/{total}] ⏭️ {city} / {cat_name} — already cached", flush=True)
return
print(f" [{combo_idx:>3}/{total}] 🔍 {city} / {cat_name}...", end=" ", flush=True)
start = time.time()
try:
result = get_recommendations_cached(
city=city,
num_attractions=6,
categories=categories,
temperature=0,
)
elapsed = time.time() - start
if result:
items = len(result)
with _COMBO_LOCK:
_COMBO_STATS["success"] += 1
print(f"✅ {items} items in {elapsed:.1f}s", flush=True)
else:
with _COMBO_LOCK:
_COMBO_STATS["fail"] += 1
print(f"❌ returned None in {elapsed:.1f}s", flush=True)
except Exception as e:
elapsed = time.time() - start
with _COMBO_LOCK:
_COMBO_STATS["fail"] += 1
print(f"❌ error after {elapsed:.1f}s: {e}", flush=True)
def prewarm():
"""Run all combos concurrently with 2 workers."""
total_combos = len(UNCATEGORIZED_CITIES) * len(CATEGORY_NAMES)
_COMBO_STATS["total"] = total_combos
llm_before = len(_LLM_CACHE)
print(f"Pre-warming caches: {len(UNCATEGORIZED_CITIES)} cities × {len(CATEGORY_NAMES)} categories = {total_combos} combos")
print(f" Workers: 2 (concurrent) — each uses random DeepSeek provider")
print(f" Existing LLM cache entries: {llm_before}")
print()
# Build all combos, shuffle for load distribution across workers
all_combos = []
idx = 0
for city in UNCATEGORIZED_CITIES:
for cat_name in CATEGORY_NAMES:
idx += 1
all_combos.append((city, cat_name, idx))
random.shuffle(all_combos)
# Re-assign sequential indices after shuffle (for display only)
for i, (city, cat_name, _) in enumerate(all_combos):
all_combos[i] = (city, cat_name, i + 1)
with ThreadPoolExecutor(max_workers=2) as pool:
futures = [
pool.submit(process_combo, city, cat_name, idx, total_combos)
for city, cat_name, idx in all_combos
]
# Process results as they complete
for future in as_completed(futures):
try:
future.result()
except Exception:
pass # Errors are already logged in process_combo
# Summary
llm_new = len(_LLM_CACHE) - llm_before
print()
print("═" * 55)
print("Pre-warm complete!")
print(f" Combos: {_COMBO_STATS['success']} succeeded, {_COMBO_STATS['skipped']} skipped, {_COMBO_STATS['fail']} failed")
print(f" New LLM cache entries: {llm_new} (total: {len(_LLM_CACHE)})")
_save_llm_cache()
_save_image_cache()
_save_geocode_cache()
print()
print("All caches saved to disk ✅")
if __name__ == "__main__":
prewarm()
|