Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| Schema Cache — Discovers the product index schema from OpenSearch and caches it. | |
| Runs on a schedule (daily at 7am) to keep the Query Understanding Agent prompt | |
| in sync with the actual index mapping without per-query overhead. | |
| """ | |
| import json | |
| import threading | |
| import time | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from typing import Optional | |
| from search_personalization.data_loader import get_opensearch_client | |
| _CACHE_FILE = Path(__file__).parent / "schema_cache" / "products_schema.json" | |
| _FILTERABLE_TYPES = {"keyword", "float", "integer", "boolean"} | |
| _INDEX_NAME = "products" | |
| _EXCLUDED_FIELDS = {"id", "image", "image_binary", "product_description_vector"} | |
| _MAX_TERMS = 50 | |
| def discover_schema() -> dict: | |
| """Fetch index mapping and top terms for filterable fields from OpenSearch.""" | |
| client = get_opensearch_client() | |
| # Get mapping | |
| mapping_resp = client.indices.get_mapping(index=_INDEX_NAME) | |
| properties = mapping_resp[_INDEX_NAME]["mappings"]["properties"] | |
| # Identify filterable fields | |
| schema = {} | |
| keyword_fields = [] | |
| for field_name, field_def in properties.items(): | |
| if field_name in _EXCLUDED_FIELDS: | |
| continue | |
| field_type = field_def.get("type") | |
| if field_type in _FILTERABLE_TYPES: | |
| schema[field_name] = {"type": field_type, "values": None} | |
| if field_type == "keyword": | |
| keyword_fields.append(field_name) | |
| # Get top terms for keyword fields via aggregations | |
| if keyword_fields: | |
| aggs = {f: {"terms": {"field": f, "size": _MAX_TERMS}} for f in keyword_fields} | |
| agg_resp = client.search(index=_INDEX_NAME, body={"size": 0, "aggs": aggs}) | |
| for field_name in keyword_fields: | |
| buckets = agg_resp["aggregations"][field_name].get("buckets", []) | |
| schema[field_name]["values"] = [b["key"] for b in buckets] | |
| return {"index": _INDEX_NAME, "discovered_at": datetime.now().isoformat(), "fields": schema} | |
| def refresh_cache() -> dict: | |
| """Run discovery and write result to disk cache.""" | |
| schema = discover_schema() | |
| _CACHE_FILE.parent.mkdir(parents=True, exist_ok=True) | |
| _CACHE_FILE.write_text(json.dumps(schema, indent=2)) | |
| return schema | |
| def get_cached_schema() -> Optional[dict]: | |
| """Load cached schema from disk. Returns None if not cached.""" | |
| if _CACHE_FILE.exists(): | |
| return json.loads(_CACHE_FILE.read_text()) | |
| return None | |
| def get_schema_prompt_block() -> str: | |
| """Render the cached schema as a prompt block for the Query Agent.""" | |
| schema = get_cached_schema() | |
| if not schema: | |
| return "" | |
| lines = [f"PRODUCT INDEX SCHEMA (discovered from '{schema['index']}' index, use these exact values for filters):"] | |
| for field_name, field_def in schema["fields"].items(): | |
| if field_def["values"]: | |
| lines.append(f"- {field_name} ({field_def['type']}): {', '.join(str(v) for v in field_def['values'])}") | |
| else: | |
| lines.append(f"- {field_name} ({field_def['type']})") | |
| return "\n".join(lines) | |
| def _run_scheduler(hour: int = 7, minute: int = 0): | |
| """Background thread that refreshes the cache daily at the specified time.""" | |
| while True: | |
| now = datetime.now() | |
| target = now.replace(hour=hour, minute=minute, second=0, microsecond=0) | |
| if target <= now: | |
| target += timedelta(days=1) | |
| wait_seconds = (target - now).total_seconds() | |
| time.sleep(wait_seconds) | |
| try: | |
| refresh_cache() | |
| print(f"[schema_cache] Refreshed at {datetime.now().isoformat()}") | |
| except Exception as e: | |
| print(f"[schema_cache] Refresh failed: {e}") | |
| def start_scheduler(hour: int = 7, minute: int = 0): | |
| """Start the daily schema refresh as a daemon thread.""" | |
| t = threading.Thread(target=_run_scheduler, args=(hour, minute), daemon=True) | |
| t.start() | |