GeoQuery / backend /core /catalog_enricher.py
GerardCB's picture
Deploy to Spaces (Final Clean)
4851501
"""
Catalog Enricher Service
Automatically generates rich metadata for datasets using LLM.
Enhances table descriptions and tags for better semantic search.
"""
import logging
from typing import Dict, List, Any, Optional
from backend.core.llm_gateway import LLMGateway
logger = logging.getLogger(__name__)
# Prompt for generating semantic descriptions
DESCRIPTION_PROMPT = """Generate a concise 2-3 sentence description for this geographic dataset.
Table Name: {table_name}
Category: {category}
Columns: {columns}
Sample Column Values: {sample_values}
Row Count: {row_count}
Focus on:
1. What geographic entities it contains (districts, health facilities, roads, etc.)
2. The geographic scope (Panama, specific province, etc.)
3. Common use cases (administrative analysis, health coverage, etc.)
Return ONLY the description, no formatting or labels."""
# Prompt for generating/refining tags
TAG_PROMPT = """Suggest 5-8 relevant tags for this geographic dataset.
Table Name: {table_name}
Description: {description}
Columns: {columns}
Current Tags: {current_tags}
Rules:
1. Tags should be lowercase, single words or hyphenated
2. Include domain tags (health, education, infrastructure)
3. Include geographic tags (administrative, boundaries, points)
4. Include data type tags (census, osm, government)
Return ONLY a JSON array of strings, e.g. ["health", "facilities", "infrastructure"]"""
class CatalogEnricher:
"""
Enriches catalog metadata with LLM-generated descriptions and tags.
Can be run on-demand for new datasets or batch-run for existing ones.
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(CatalogEnricher, cls).__new__(cls)
cls._instance.initialized = False
return cls._instance
def __init__(self):
if self.initialized:
return
self.llm = LLMGateway()
self.initialized = True
async def generate_description(
self,
table_name: str,
metadata: Dict[str, Any],
sample_values: Optional[Dict[str, str]] = None
) -> str:
"""
Generate a semantic description for a dataset using LLM.
Args:
table_name: Name of the table
metadata: Catalog metadata dict
sample_values: Optional dict of column -> sample value
Returns:
Generated description string
"""
columns = metadata.get("columns", [])
category = metadata.get("category", "unknown")
row_count = metadata.get("row_count", "unknown")
# Format sample values
sample_str = "Not available"
if sample_values:
sample_str = ", ".join(f"{k}: {v}" for k, v in list(sample_values.items())[:5])
prompt = DESCRIPTION_PROMPT.format(
table_name=table_name,
category=category,
columns=", ".join(columns[:15]), # Limit columns
sample_values=sample_str,
row_count=row_count
)
try:
response = await self.llm.generate_response(prompt)
description = response.strip()
# Basic validation
if len(description) < 20 or len(description) > 500:
logger.warning(f"Generated description for {table_name} seems unusual: {len(description)} chars")
return description
except Exception as e:
logger.error(f"Failed to generate description for {table_name}: {e}")
return metadata.get("description", f"Geographic data from {category}")
async def generate_tags(
self,
table_name: str,
metadata: Dict[str, Any]
) -> List[str]:
"""
Generate or refine tags for a dataset using LLM.
Args:
table_name: Name of the table
metadata: Catalog metadata dict
Returns:
List of tag strings
"""
columns = metadata.get("columns", [])
description = metadata.get("semantic_description") or metadata.get("description", "")
current_tags = metadata.get("tags", [])
prompt = TAG_PROMPT.format(
table_name=table_name,
description=description,
columns=", ".join(columns[:15]),
current_tags=current_tags
)
try:
import json
response = await self.llm.generate_response(prompt)
# Parse JSON array
response = response.strip()
if response.startswith("```"):
response = response.split("```")[1]
if response.startswith("json"):
response = response[4:]
tags = json.loads(response)
if isinstance(tags, list):
# Validate and clean tags
clean_tags = []
for tag in tags:
if isinstance(tag, str):
tag = tag.lower().strip()
if 2 <= len(tag) <= 30:
clean_tags.append(tag)
return clean_tags
except Exception as e:
logger.error(f"Failed to generate tags for {table_name}: {e}")
return current_tags
async def enrich_table(
self,
table_name: str,
metadata: Dict[str, Any],
sample_values: Optional[Dict[str, str]] = None,
force_refresh: bool = False
) -> Dict[str, Any]:
"""
Fully enrich a table's metadata with description and tags.
Args:
table_name: Name of the table
metadata: Current catalog metadata
sample_values: Optional sample data for context
force_refresh: If True, regenerate even if already enriched
Returns:
Updated metadata dict
"""
updated = metadata.copy()
# Generate description if missing or forced
if force_refresh or not metadata.get("semantic_description"):
logger.info(f"Generating semantic description for {table_name}...")
description = await self.generate_description(table_name, metadata, sample_values)
updated["semantic_description"] = description
# Generate/refine tags (always, to improve quality)
if force_refresh or len(metadata.get("tags", [])) < 3:
logger.info(f"Generating tags for {table_name}...")
tags = await self.generate_tags(table_name, updated)
# Merge with existing, deduplicate
existing_tags = set(metadata.get("tags", []))
new_tags = set(tags)
updated["tags"] = list(existing_tags | new_tags)
return updated
# Singleton accessor
_catalog_enricher: Optional[CatalogEnricher] = None
def get_catalog_enricher() -> CatalogEnricher:
"""Get the singleton catalog enricher instance."""
global _catalog_enricher
if _catalog_enricher is None:
_catalog_enricher = CatalogEnricher()
return _catalog_enricher