|
|
""" |
|
|
Catalog Enricher Service |
|
|
|
|
|
Automatically generates rich metadata for datasets using LLM. |
|
|
Enhances table descriptions and tags for better semantic search. |
|
|
""" |
|
|
|
|
|
import logging |
|
|
from typing import Dict, List, Any, Optional |
|
|
from backend.core.llm_gateway import LLMGateway |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
DESCRIPTION_PROMPT = """Generate a concise 2-3 sentence description for this geographic dataset. |
|
|
|
|
|
Table Name: {table_name} |
|
|
Category: {category} |
|
|
Columns: {columns} |
|
|
Sample Column Values: {sample_values} |
|
|
Row Count: {row_count} |
|
|
|
|
|
Focus on: |
|
|
1. What geographic entities it contains (districts, health facilities, roads, etc.) |
|
|
2. The geographic scope (Panama, specific province, etc.) |
|
|
3. Common use cases (administrative analysis, health coverage, etc.) |
|
|
|
|
|
Return ONLY the description, no formatting or labels.""" |
|
|
|
|
|
|
|
|
TAG_PROMPT = """Suggest 5-8 relevant tags for this geographic dataset. |
|
|
|
|
|
Table Name: {table_name} |
|
|
Description: {description} |
|
|
Columns: {columns} |
|
|
Current Tags: {current_tags} |
|
|
|
|
|
Rules: |
|
|
1. Tags should be lowercase, single words or hyphenated |
|
|
2. Include domain tags (health, education, infrastructure) |
|
|
3. Include geographic tags (administrative, boundaries, points) |
|
|
4. Include data type tags (census, osm, government) |
|
|
|
|
|
Return ONLY a JSON array of strings, e.g. ["health", "facilities", "infrastructure"]""" |
|
|
|
|
|
|
|
|
class CatalogEnricher: |
|
|
""" |
|
|
Enriches catalog metadata with LLM-generated descriptions and tags. |
|
|
|
|
|
Can be run on-demand for new datasets or batch-run for existing ones. |
|
|
""" |
|
|
|
|
|
_instance = None |
|
|
|
|
|
def __new__(cls): |
|
|
if cls._instance is None: |
|
|
cls._instance = super(CatalogEnricher, cls).__new__(cls) |
|
|
cls._instance.initialized = False |
|
|
return cls._instance |
|
|
|
|
|
def __init__(self): |
|
|
if self.initialized: |
|
|
return |
|
|
|
|
|
self.llm = LLMGateway() |
|
|
self.initialized = True |
|
|
|
|
|
async def generate_description( |
|
|
self, |
|
|
table_name: str, |
|
|
metadata: Dict[str, Any], |
|
|
sample_values: Optional[Dict[str, str]] = None |
|
|
) -> str: |
|
|
""" |
|
|
Generate a semantic description for a dataset using LLM. |
|
|
|
|
|
Args: |
|
|
table_name: Name of the table |
|
|
metadata: Catalog metadata dict |
|
|
sample_values: Optional dict of column -> sample value |
|
|
|
|
|
Returns: |
|
|
Generated description string |
|
|
""" |
|
|
columns = metadata.get("columns", []) |
|
|
category = metadata.get("category", "unknown") |
|
|
row_count = metadata.get("row_count", "unknown") |
|
|
|
|
|
|
|
|
sample_str = "Not available" |
|
|
if sample_values: |
|
|
sample_str = ", ".join(f"{k}: {v}" for k, v in list(sample_values.items())[:5]) |
|
|
|
|
|
prompt = DESCRIPTION_PROMPT.format( |
|
|
table_name=table_name, |
|
|
category=category, |
|
|
columns=", ".join(columns[:15]), |
|
|
sample_values=sample_str, |
|
|
row_count=row_count |
|
|
) |
|
|
|
|
|
try: |
|
|
response = await self.llm.generate_response(prompt) |
|
|
description = response.strip() |
|
|
|
|
|
|
|
|
if len(description) < 20 or len(description) > 500: |
|
|
logger.warning(f"Generated description for {table_name} seems unusual: {len(description)} chars") |
|
|
|
|
|
return description |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to generate description for {table_name}: {e}") |
|
|
return metadata.get("description", f"Geographic data from {category}") |
|
|
|
|
|
async def generate_tags( |
|
|
self, |
|
|
table_name: str, |
|
|
metadata: Dict[str, Any] |
|
|
) -> List[str]: |
|
|
""" |
|
|
Generate or refine tags for a dataset using LLM. |
|
|
|
|
|
Args: |
|
|
table_name: Name of the table |
|
|
metadata: Catalog metadata dict |
|
|
|
|
|
Returns: |
|
|
List of tag strings |
|
|
""" |
|
|
columns = metadata.get("columns", []) |
|
|
description = metadata.get("semantic_description") or metadata.get("description", "") |
|
|
current_tags = metadata.get("tags", []) |
|
|
|
|
|
prompt = TAG_PROMPT.format( |
|
|
table_name=table_name, |
|
|
description=description, |
|
|
columns=", ".join(columns[:15]), |
|
|
current_tags=current_tags |
|
|
) |
|
|
|
|
|
try: |
|
|
import json |
|
|
response = await self.llm.generate_response(prompt) |
|
|
|
|
|
|
|
|
response = response.strip() |
|
|
if response.startswith("```"): |
|
|
response = response.split("```")[1] |
|
|
if response.startswith("json"): |
|
|
response = response[4:] |
|
|
|
|
|
tags = json.loads(response) |
|
|
|
|
|
if isinstance(tags, list): |
|
|
|
|
|
clean_tags = [] |
|
|
for tag in tags: |
|
|
if isinstance(tag, str): |
|
|
tag = tag.lower().strip() |
|
|
if 2 <= len(tag) <= 30: |
|
|
clean_tags.append(tag) |
|
|
|
|
|
return clean_tags |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to generate tags for {table_name}: {e}") |
|
|
|
|
|
return current_tags |
|
|
|
|
|
async def enrich_table( |
|
|
self, |
|
|
table_name: str, |
|
|
metadata: Dict[str, Any], |
|
|
sample_values: Optional[Dict[str, str]] = None, |
|
|
force_refresh: bool = False |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Fully enrich a table's metadata with description and tags. |
|
|
|
|
|
Args: |
|
|
table_name: Name of the table |
|
|
metadata: Current catalog metadata |
|
|
sample_values: Optional sample data for context |
|
|
force_refresh: If True, regenerate even if already enriched |
|
|
|
|
|
Returns: |
|
|
Updated metadata dict |
|
|
""" |
|
|
updated = metadata.copy() |
|
|
|
|
|
|
|
|
if force_refresh or not metadata.get("semantic_description"): |
|
|
logger.info(f"Generating semantic description for {table_name}...") |
|
|
description = await self.generate_description(table_name, metadata, sample_values) |
|
|
updated["semantic_description"] = description |
|
|
|
|
|
|
|
|
if force_refresh or len(metadata.get("tags", [])) < 3: |
|
|
logger.info(f"Generating tags for {table_name}...") |
|
|
tags = await self.generate_tags(table_name, updated) |
|
|
|
|
|
existing_tags = set(metadata.get("tags", [])) |
|
|
new_tags = set(tags) |
|
|
updated["tags"] = list(existing_tags | new_tags) |
|
|
|
|
|
return updated |
|
|
|
|
|
|
|
|
|
|
|
_catalog_enricher: Optional[CatalogEnricher] = None |
|
|
|
|
|
|
|
|
def get_catalog_enricher() -> CatalogEnricher: |
|
|
"""Get the singleton catalog enricher instance.""" |
|
|
global _catalog_enricher |
|
|
if _catalog_enricher is None: |
|
|
_catalog_enricher = CatalogEnricher() |
|
|
return _catalog_enricher |
|
|
|