File size: 7,318 Bytes
4851501 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 | """
Catalog Enricher Service
Automatically generates rich metadata for datasets using LLM.
Enhances table descriptions and tags for better semantic search.
"""
import logging
from typing import Dict, List, Any, Optional
from backend.core.llm_gateway import LLMGateway
logger = logging.getLogger(__name__)
# Prompt for generating semantic descriptions
DESCRIPTION_PROMPT = """Generate a concise 2-3 sentence description for this geographic dataset.
Table Name: {table_name}
Category: {category}
Columns: {columns}
Sample Column Values: {sample_values}
Row Count: {row_count}
Focus on:
1. What geographic entities it contains (districts, health facilities, roads, etc.)
2. The geographic scope (Panama, specific province, etc.)
3. Common use cases (administrative analysis, health coverage, etc.)
Return ONLY the description, no formatting or labels."""
# Prompt for generating/refining tags
TAG_PROMPT = """Suggest 5-8 relevant tags for this geographic dataset.
Table Name: {table_name}
Description: {description}
Columns: {columns}
Current Tags: {current_tags}
Rules:
1. Tags should be lowercase, single words or hyphenated
2. Include domain tags (health, education, infrastructure)
3. Include geographic tags (administrative, boundaries, points)
4. Include data type tags (census, osm, government)
Return ONLY a JSON array of strings, e.g. ["health", "facilities", "infrastructure"]"""
class CatalogEnricher:
"""
Enriches catalog metadata with LLM-generated descriptions and tags.
Can be run on-demand for new datasets or batch-run for existing ones.
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(CatalogEnricher, cls).__new__(cls)
cls._instance.initialized = False
return cls._instance
def __init__(self):
if self.initialized:
return
self.llm = LLMGateway()
self.initialized = True
async def generate_description(
self,
table_name: str,
metadata: Dict[str, Any],
sample_values: Optional[Dict[str, str]] = None
) -> str:
"""
Generate a semantic description for a dataset using LLM.
Args:
table_name: Name of the table
metadata: Catalog metadata dict
sample_values: Optional dict of column -> sample value
Returns:
Generated description string
"""
columns = metadata.get("columns", [])
category = metadata.get("category", "unknown")
row_count = metadata.get("row_count", "unknown")
# Format sample values
sample_str = "Not available"
if sample_values:
sample_str = ", ".join(f"{k}: {v}" for k, v in list(sample_values.items())[:5])
prompt = DESCRIPTION_PROMPT.format(
table_name=table_name,
category=category,
columns=", ".join(columns[:15]), # Limit columns
sample_values=sample_str,
row_count=row_count
)
try:
response = await self.llm.generate_response(prompt)
description = response.strip()
# Basic validation
if len(description) < 20 or len(description) > 500:
logger.warning(f"Generated description for {table_name} seems unusual: {len(description)} chars")
return description
except Exception as e:
logger.error(f"Failed to generate description for {table_name}: {e}")
return metadata.get("description", f"Geographic data from {category}")
async def generate_tags(
self,
table_name: str,
metadata: Dict[str, Any]
) -> List[str]:
"""
Generate or refine tags for a dataset using LLM.
Args:
table_name: Name of the table
metadata: Catalog metadata dict
Returns:
List of tag strings
"""
columns = metadata.get("columns", [])
description = metadata.get("semantic_description") or metadata.get("description", "")
current_tags = metadata.get("tags", [])
prompt = TAG_PROMPT.format(
table_name=table_name,
description=description,
columns=", ".join(columns[:15]),
current_tags=current_tags
)
try:
import json
response = await self.llm.generate_response(prompt)
# Parse JSON array
response = response.strip()
if response.startswith("```"):
response = response.split("```")[1]
if response.startswith("json"):
response = response[4:]
tags = json.loads(response)
if isinstance(tags, list):
# Validate and clean tags
clean_tags = []
for tag in tags:
if isinstance(tag, str):
tag = tag.lower().strip()
if 2 <= len(tag) <= 30:
clean_tags.append(tag)
return clean_tags
except Exception as e:
logger.error(f"Failed to generate tags for {table_name}: {e}")
return current_tags
async def enrich_table(
self,
table_name: str,
metadata: Dict[str, Any],
sample_values: Optional[Dict[str, str]] = None,
force_refresh: bool = False
) -> Dict[str, Any]:
"""
Fully enrich a table's metadata with description and tags.
Args:
table_name: Name of the table
metadata: Current catalog metadata
sample_values: Optional sample data for context
force_refresh: If True, regenerate even if already enriched
Returns:
Updated metadata dict
"""
updated = metadata.copy()
# Generate description if missing or forced
if force_refresh or not metadata.get("semantic_description"):
logger.info(f"Generating semantic description for {table_name}...")
description = await self.generate_description(table_name, metadata, sample_values)
updated["semantic_description"] = description
# Generate/refine tags (always, to improve quality)
if force_refresh or len(metadata.get("tags", [])) < 3:
logger.info(f"Generating tags for {table_name}...")
tags = await self.generate_tags(table_name, updated)
# Merge with existing, deduplicate
existing_tags = set(metadata.get("tags", []))
new_tags = set(tags)
updated["tags"] = list(existing_tags | new_tags)
return updated
# Singleton accessor
_catalog_enricher: Optional[CatalogEnricher] = None
def get_catalog_enricher() -> CatalogEnricher:
"""Get the singleton catalog enricher instance."""
global _catalog_enricher
if _catalog_enricher is None:
_catalog_enricher = CatalogEnricher()
return _catalog_enricher
|