prasadnu's picture
feat: add Search Personalization demo module
b4d5c9a
Raw
History Blame
3.91 kB
"""
Schema Cache — Discovers the product index schema from OpenSearch and caches it.
Runs on a schedule (daily at 7am) to keep the Query Understanding Agent prompt
in sync with the actual index mapping without per-query overhead.
"""
import json
import threading
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from search_personalization.data_loader import get_opensearch_client
_CACHE_FILE = Path(__file__).parent / "schema_cache" / "products_schema.json"
_FILTERABLE_TYPES = {"keyword", "float", "integer", "boolean"}
_INDEX_NAME = "products"
_EXCLUDED_FIELDS = {"id", "image", "image_binary", "product_description_vector"}
_MAX_TERMS = 50
def discover_schema() -> dict:
"""Fetch index mapping and top terms for filterable fields from OpenSearch."""
client = get_opensearch_client()
# Get mapping
mapping_resp = client.indices.get_mapping(index=_INDEX_NAME)
properties = mapping_resp[_INDEX_NAME]["mappings"]["properties"]
# Identify filterable fields
schema = {}
keyword_fields = []
for field_name, field_def in properties.items():
if field_name in _EXCLUDED_FIELDS:
continue
field_type = field_def.get("type")
if field_type in _FILTERABLE_TYPES:
schema[field_name] = {"type": field_type, "values": None}
if field_type == "keyword":
keyword_fields.append(field_name)
# Get top terms for keyword fields via aggregations
if keyword_fields:
aggs = {f: {"terms": {"field": f, "size": _MAX_TERMS}} for f in keyword_fields}
agg_resp = client.search(index=_INDEX_NAME, body={"size": 0, "aggs": aggs})
for field_name in keyword_fields:
buckets = agg_resp["aggregations"][field_name].get("buckets", [])
schema[field_name]["values"] = [b["key"] for b in buckets]
return {"index": _INDEX_NAME, "discovered_at": datetime.now().isoformat(), "fields": schema}
def refresh_cache() -> dict:
"""Run discovery and write result to disk cache."""
schema = discover_schema()
_CACHE_FILE.parent.mkdir(parents=True, exist_ok=True)
_CACHE_FILE.write_text(json.dumps(schema, indent=2))
return schema
def get_cached_schema() -> Optional[dict]:
"""Load cached schema from disk. Returns None if not cached."""
if _CACHE_FILE.exists():
return json.loads(_CACHE_FILE.read_text())
return None
def get_schema_prompt_block() -> str:
"""Render the cached schema as a prompt block for the Query Agent."""
schema = get_cached_schema()
if not schema:
return ""
lines = [f"PRODUCT INDEX SCHEMA (discovered from '{schema['index']}' index, use these exact values for filters):"]
for field_name, field_def in schema["fields"].items():
if field_def["values"]:
lines.append(f"- {field_name} ({field_def['type']}): {', '.join(str(v) for v in field_def['values'])}")
else:
lines.append(f"- {field_name} ({field_def['type']})")
return "\n".join(lines)
def _run_scheduler(hour: int = 7, minute: int = 0):
"""Background thread that refreshes the cache daily at the specified time."""
while True:
now = datetime.now()
target = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if target <= now:
target += timedelta(days=1)
wait_seconds = (target - now).total_seconds()
time.sleep(wait_seconds)
try:
refresh_cache()
print(f"[schema_cache] Refreshed at {datetime.now().isoformat()}")
except Exception as e:
print(f"[schema_cache] Refresh failed: {e}")
def start_scheduler(hour: int = 7, minute: int = 0):
"""Start the daily schema refresh as a daemon thread."""
t = threading.Thread(target=_run_scheduler, args=(hour, minute), daemon=True)
t.start()