""" Schema Cache — Discovers the product index schema from OpenSearch and caches it. Runs on a schedule (daily at 7am) to keep the Query Understanding Agent prompt in sync with the actual index mapping without per-query overhead. """ import json import threading import time from datetime import datetime, timedelta from pathlib import Path from typing import Optional from search_personalization.data_loader import get_opensearch_client _CACHE_FILE = Path(__file__).parent / "schema_cache" / "products_schema.json" _FILTERABLE_TYPES = {"keyword", "float", "integer", "boolean"} _INDEX_NAME = "products" _EXCLUDED_FIELDS = {"id", "image", "image_binary", "product_description_vector"} _MAX_TERMS = 50 def discover_schema() -> dict: """Fetch index mapping and top terms for filterable fields from OpenSearch.""" client = get_opensearch_client() # Get mapping mapping_resp = client.indices.get_mapping(index=_INDEX_NAME) properties = mapping_resp[_INDEX_NAME]["mappings"]["properties"] # Identify filterable fields schema = {} keyword_fields = [] for field_name, field_def in properties.items(): if field_name in _EXCLUDED_FIELDS: continue field_type = field_def.get("type") if field_type in _FILTERABLE_TYPES: schema[field_name] = {"type": field_type, "values": None} if field_type == "keyword": keyword_fields.append(field_name) # Get top terms for keyword fields via aggregations if keyword_fields: aggs = {f: {"terms": {"field": f, "size": _MAX_TERMS}} for f in keyword_fields} agg_resp = client.search(index=_INDEX_NAME, body={"size": 0, "aggs": aggs}) for field_name in keyword_fields: buckets = agg_resp["aggregations"][field_name].get("buckets", []) schema[field_name]["values"] = [b["key"] for b in buckets] return {"index": _INDEX_NAME, "discovered_at": datetime.now().isoformat(), "fields": schema} def refresh_cache() -> dict: """Run discovery and write result to disk cache.""" schema = discover_schema() _CACHE_FILE.parent.mkdir(parents=True, exist_ok=True) _CACHE_FILE.write_text(json.dumps(schema, indent=2)) return schema def get_cached_schema() -> Optional[dict]: """Load cached schema from disk. Returns None if not cached.""" if _CACHE_FILE.exists(): return json.loads(_CACHE_FILE.read_text()) return None def get_schema_prompt_block() -> str: """Render the cached schema as a prompt block for the Query Agent.""" schema = get_cached_schema() if not schema: return "" lines = [f"PRODUCT INDEX SCHEMA (discovered from '{schema['index']}' index, use these exact values for filters):"] for field_name, field_def in schema["fields"].items(): if field_def["values"]: lines.append(f"- {field_name} ({field_def['type']}): {', '.join(str(v) for v in field_def['values'])}") else: lines.append(f"- {field_name} ({field_def['type']})") return "\n".join(lines) def _run_scheduler(hour: int = 7, minute: int = 0): """Background thread that refreshes the cache daily at the specified time.""" while True: now = datetime.now() target = now.replace(hour=hour, minute=minute, second=0, microsecond=0) if target <= now: target += timedelta(days=1) wait_seconds = (target - now).total_seconds() time.sleep(wait_seconds) try: refresh_cache() print(f"[schema_cache] Refreshed at {datetime.now().isoformat()}") except Exception as e: print(f"[schema_cache] Refresh failed: {e}") def start_scheduler(hour: int = 7, minute: int = 0): """Start the daily schema refresh as a daemon thread.""" t = threading.Thread(target=_run_scheduler, args=(hour, minute), daemon=True) t.start()