| |
| """ |
| Cache health check — scans and repairs all 3 disk caches. |
| |
| Checks every cached LLM recommendation for: |
| 🖼 Missing image URLs → re-runs enrichment (Tier 1-6, including city fallback) |
| 📍 Missing/bad coordinates → re-runs geocode verification |
| 🗺 Wrong-city attractions → flags them |
| |
| Usage: |
| cd roamify && python scripts/check_cache.py [--report-only] |
| """ |
|
|
| import os |
| import sys |
| import time |
| import json |
| import argparse |
|
|
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) |
|
|
| from dotenv import load_dotenv |
| load_dotenv(dotenv_path=os.path.join(os.path.dirname(__file__), "..", ".env"), override=True) |
|
|
| from services.recommender import ( |
| _LLM_CACHE, |
| _IMAGE_CACHE, |
| _GEOCODE_CACHE, |
| _save_llm_cache, |
| _save_image_cache, |
| _save_geocode_cache, |
| _enrich_one_item, |
| _verify_coordinates, |
| ) |
|
|
|
|
| def check_cache(report_only: bool = False) -> dict: |
| """Scan all LLM cache entries, find and repair issues. Returns summary.""" |
| stats = { |
| "entries_scanned": 0, |
| "items_checked": 0, |
| "images_missing": 0, |
| "images_fixed": 0, |
| "coordinates_missing": 0, |
| "coordinates_fixed": 0, |
| "coordinates_out_of_city": 0, |
| } |
|
|
| |
| keys = list(_LLM_CACHE.keys()) |
|
|
| for key in keys: |
| city = key[0] |
| items = _LLM_CACHE[key] |
| if not items: |
| continue |
|
|
| stats["entries_scanned"] += 1 |
| fixed_this_entry = False |
|
|
| for item in items: |
| stats["items_checked"] += 1 |
| name = item.get("name", "???") |
|
|
| |
| img_url = item.get("image_url", "") |
| if not img_url: |
| stats["images_missing"] += 1 |
| if not report_only: |
| print(f" 🖼 Missing image: {city} → {name}") |
| |
| img_cache_key = (name, city, "") |
| if img_cache_key in _IMAGE_CACHE: |
| del _IMAGE_CACHE[img_cache_key] |
| _enrich_one_item(item, city=city) |
| if item.get("image_url"): |
| stats["images_fixed"] += 1 |
| fixed_this_entry = True |
| print(f" ✅ Fixed") |
| else: |
| print(f" ⚠️ Still no image (all tiers exhausted)") |
| time.sleep(0.5) |
|
|
| |
| lat = item.get("latitude") |
| lon = item.get("longitude") |
| coords_bad = ( |
| lat is None or lon is None |
| or str(lat).strip() == "" or str(lon).strip() == "" |
| or float(lat) == 0 or float(lon) == 0 |
| ) |
| if coords_bad: |
| stats["coordinates_missing"] += 1 |
| if not report_only: |
| print(f" 📍 Bad coords: {city} → {name} (lat={lat}, lon={lon})") |
| |
| verified = _verify_coordinates([item], city) |
| if verified and verified[0].get("latitude") and verified[0].get("longitude"): |
| stats["coordinates_fixed"] += 1 |
| fixed_this_entry = True |
| |
| for k, v in verified[0].items(): |
| item[k] = v |
| print(f" ✅ Fixed → ({item['latitude']}, {item['longitude']})") |
| else: |
| print(f" ⚠️ Still bad — skipping repair") |
| time.sleep(1.0) |
|
|
| if fixed_this_entry: |
| print() |
|
|
| |
| |
| city_images: dict[str, dict[str, list[str]]] = {} |
| for key in keys: |
| city = key[0] |
| items = _LLM_CACHE[key] |
| if not items: |
| continue |
| if city not in city_images: |
| city_images[city] = {} |
| for item in items: |
| url = item.get("image_url", "") |
| name = item.get("name", "???") |
| if url: |
| city_images[city].setdefault(url, []).append(name) |
|
|
| for city, url_map in city_images.items(): |
| for url, names in url_map.items(): |
| if len(names) > 1: |
| |
| print(f" 🔁 Duplicate image in {city}: {len(names)} attractions share the same photo") |
| for dup_name in names: |
| print(f" → {dup_name}") |
| if not report_only: |
| for dup_name in names: |
| img_cache_key = (dup_name, city, "") |
| if img_cache_key in _IMAGE_CACHE and _IMAGE_CACHE[img_cache_key] == url: |
| del _IMAGE_CACHE[img_cache_key] |
| |
| for key in keys: |
| if key[0] == city: |
| items = _LLM_CACHE[key] |
| if not items: |
| continue |
| for item in items: |
| if item.get("name") == dup_name: |
| _enrich_one_item(item, city=city) |
| new_url = item.get("image_url", "") |
| if new_url and new_url != url: |
| print(f" ✅ Re-fetched unique image for {dup_name}") |
| else: |
| print(f" ⚠️ Still using fallback for {dup_name}") |
| break |
| stats["images_fixed"] += 1 |
| time.sleep(0.5) |
|
|
| |
| return stats |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Cache health check for Roamify") |
| parser.add_argument("--report-only", action="store_true", |
| help="Only report issues, don't fix them") |
| args = parser.parse_args() |
|
|
| llm_before = len(_LLM_CACHE) |
| img_before = len(_IMAGE_CACHE) |
| geo_before = len(_GEOCODE_CACHE) |
|
|
| action = "Reporting" if args.report_only else "Scanning & repairing" |
| print(f"{action} caches...") |
| print(f" LLM entries: {llm_before}") |
| print(f" Image entries: {img_before}") |
| print(f" Geocode entries: {geo_before}") |
| print() |
|
|
| stats = check_cache(report_only=args.report_only) |
|
|
| print() |
| print("═" * 55) |
| print("Cache health check complete!") |
| print(f" Entries scanned: {stats['entries_scanned']}") |
| print(f" Items checked: {stats['items_checked']}") |
| print(f" Missing images: {stats['images_missing']} → fixed: {stats['images_fixed']}") |
| print(f" Bad coords: {stats['coordinates_missing']} → fixed: {stats['coordinates_fixed']}") |
|
|
| llm_after = len(_LLM_CACHE) |
| img_after = len(_IMAGE_CACHE) |
| geo_after = len(_GEOCODE_CACHE) |
|
|
| print() |
| print(f" LLM entries: {llm_before} → {llm_after}") |
| print(f" Image entries: {img_before} → {img_after}") |
| print(f" Geocode entries: {geo_before} → {geo_after}") |
|
|
| if not args.report_only: |
| _save_llm_cache() |
| _save_image_cache() |
| _save_geocode_cache() |
| print() |
| print("All caches saved to disk ✅") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|