""" Catalog Enricher Service Automatically generates rich metadata for datasets using LLM. Enhances table descriptions and tags for better semantic search. """ import logging from typing import Dict, List, Any, Optional from backend.core.llm_gateway import LLMGateway logger = logging.getLogger(__name__) # Prompt for generating semantic descriptions DESCRIPTION_PROMPT = """Generate a concise 2-3 sentence description for this geographic dataset. Table Name: {table_name} Category: {category} Columns: {columns} Sample Column Values: {sample_values} Row Count: {row_count} Focus on: 1. What geographic entities it contains (districts, health facilities, roads, etc.) 2. The geographic scope (Panama, specific province, etc.) 3. Common use cases (administrative analysis, health coverage, etc.) Return ONLY the description, no formatting or labels.""" # Prompt for generating/refining tags TAG_PROMPT = """Suggest 5-8 relevant tags for this geographic dataset. Table Name: {table_name} Description: {description} Columns: {columns} Current Tags: {current_tags} Rules: 1. Tags should be lowercase, single words or hyphenated 2. Include domain tags (health, education, infrastructure) 3. Include geographic tags (administrative, boundaries, points) 4. Include data type tags (census, osm, government) Return ONLY a JSON array of strings, e.g. ["health", "facilities", "infrastructure"]""" class CatalogEnricher: """ Enriches catalog metadata with LLM-generated descriptions and tags. Can be run on-demand for new datasets or batch-run for existing ones. """ _instance = None def __new__(cls): if cls._instance is None: cls._instance = super(CatalogEnricher, cls).__new__(cls) cls._instance.initialized = False return cls._instance def __init__(self): if self.initialized: return self.llm = LLMGateway() self.initialized = True async def generate_description( self, table_name: str, metadata: Dict[str, Any], sample_values: Optional[Dict[str, str]] = None ) -> str: """ Generate a semantic description for a dataset using LLM. Args: table_name: Name of the table metadata: Catalog metadata dict sample_values: Optional dict of column -> sample value Returns: Generated description string """ columns = metadata.get("columns", []) category = metadata.get("category", "unknown") row_count = metadata.get("row_count", "unknown") # Format sample values sample_str = "Not available" if sample_values: sample_str = ", ".join(f"{k}: {v}" for k, v in list(sample_values.items())[:5]) prompt = DESCRIPTION_PROMPT.format( table_name=table_name, category=category, columns=", ".join(columns[:15]), # Limit columns sample_values=sample_str, row_count=row_count ) try: response = await self.llm.generate_response(prompt) description = response.strip() # Basic validation if len(description) < 20 or len(description) > 500: logger.warning(f"Generated description for {table_name} seems unusual: {len(description)} chars") return description except Exception as e: logger.error(f"Failed to generate description for {table_name}: {e}") return metadata.get("description", f"Geographic data from {category}") async def generate_tags( self, table_name: str, metadata: Dict[str, Any] ) -> List[str]: """ Generate or refine tags for a dataset using LLM. Args: table_name: Name of the table metadata: Catalog metadata dict Returns: List of tag strings """ columns = metadata.get("columns", []) description = metadata.get("semantic_description") or metadata.get("description", "") current_tags = metadata.get("tags", []) prompt = TAG_PROMPT.format( table_name=table_name, description=description, columns=", ".join(columns[:15]), current_tags=current_tags ) try: import json response = await self.llm.generate_response(prompt) # Parse JSON array response = response.strip() if response.startswith("```"): response = response.split("```")[1] if response.startswith("json"): response = response[4:] tags = json.loads(response) if isinstance(tags, list): # Validate and clean tags clean_tags = [] for tag in tags: if isinstance(tag, str): tag = tag.lower().strip() if 2 <= len(tag) <= 30: clean_tags.append(tag) return clean_tags except Exception as e: logger.error(f"Failed to generate tags for {table_name}: {e}") return current_tags async def enrich_table( self, table_name: str, metadata: Dict[str, Any], sample_values: Optional[Dict[str, str]] = None, force_refresh: bool = False ) -> Dict[str, Any]: """ Fully enrich a table's metadata with description and tags. Args: table_name: Name of the table metadata: Current catalog metadata sample_values: Optional sample data for context force_refresh: If True, regenerate even if already enriched Returns: Updated metadata dict """ updated = metadata.copy() # Generate description if missing or forced if force_refresh or not metadata.get("semantic_description"): logger.info(f"Generating semantic description for {table_name}...") description = await self.generate_description(table_name, metadata, sample_values) updated["semantic_description"] = description # Generate/refine tags (always, to improve quality) if force_refresh or len(metadata.get("tags", [])) < 3: logger.info(f"Generating tags for {table_name}...") tags = await self.generate_tags(table_name, updated) # Merge with existing, deduplicate existing_tags = set(metadata.get("tags", [])) new_tags = set(tags) updated["tags"] = list(existing_tags | new_tags) return updated # Singleton accessor _catalog_enricher: Optional[CatalogEnricher] = None def get_catalog_enricher() -> CatalogEnricher: """Get the singleton catalog enricher instance.""" global _catalog_enricher if _catalog_enricher is None: _catalog_enricher = CatalogEnricher() return _catalog_enricher