File size: 7,318 Bytes
4851501
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
"""
Catalog Enricher Service

Automatically generates rich metadata for datasets using LLM.
Enhances table descriptions and tags for better semantic search.
"""

import logging
from typing import Dict, List, Any, Optional
from backend.core.llm_gateway import LLMGateway

logger = logging.getLogger(__name__)


# Prompt for generating semantic descriptions
DESCRIPTION_PROMPT = """Generate a concise 2-3 sentence description for this geographic dataset.

Table Name: {table_name}
Category: {category}
Columns: {columns}
Sample Column Values: {sample_values}
Row Count: {row_count}

Focus on:
1. What geographic entities it contains (districts, health facilities, roads, etc.)
2. The geographic scope (Panama, specific province, etc.)
3. Common use cases (administrative analysis, health coverage, etc.)

Return ONLY the description, no formatting or labels."""

# Prompt for generating/refining tags
TAG_PROMPT = """Suggest 5-8 relevant tags for this geographic dataset.

Table Name: {table_name}
Description: {description}
Columns: {columns}
Current Tags: {current_tags}

Rules:
1. Tags should be lowercase, single words or hyphenated
2. Include domain tags (health, education, infrastructure)
3. Include geographic tags (administrative, boundaries, points)
4. Include data type tags (census, osm, government)

Return ONLY a JSON array of strings, e.g. ["health", "facilities", "infrastructure"]"""


class CatalogEnricher:
    """
    Enriches catalog metadata with LLM-generated descriptions and tags.
    
    Can be run on-demand for new datasets or batch-run for existing ones.
    """
    
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(CatalogEnricher, cls).__new__(cls)
            cls._instance.initialized = False
        return cls._instance
    
    def __init__(self):
        if self.initialized:
            return
            
        self.llm = LLMGateway()
        self.initialized = True
    
    async def generate_description(
        self, 
        table_name: str, 
        metadata: Dict[str, Any],
        sample_values: Optional[Dict[str, str]] = None
    ) -> str:
        """
        Generate a semantic description for a dataset using LLM.
        
        Args:
            table_name: Name of the table
            metadata: Catalog metadata dict
            sample_values: Optional dict of column -> sample value
            
        Returns:
            Generated description string
        """
        columns = metadata.get("columns", [])
        category = metadata.get("category", "unknown")
        row_count = metadata.get("row_count", "unknown")
        
        # Format sample values
        sample_str = "Not available"
        if sample_values:
            sample_str = ", ".join(f"{k}: {v}" for k, v in list(sample_values.items())[:5])
        
        prompt = DESCRIPTION_PROMPT.format(
            table_name=table_name,
            category=category,
            columns=", ".join(columns[:15]),  # Limit columns
            sample_values=sample_str,
            row_count=row_count
        )
        
        try:
            response = await self.llm.generate_response(prompt)
            description = response.strip()
            
            # Basic validation
            if len(description) < 20 or len(description) > 500:
                logger.warning(f"Generated description for {table_name} seems unusual: {len(description)} chars")
            
            return description
            
        except Exception as e:
            logger.error(f"Failed to generate description for {table_name}: {e}")
            return metadata.get("description", f"Geographic data from {category}")
    
    async def generate_tags(
        self, 
        table_name: str, 
        metadata: Dict[str, Any]
    ) -> List[str]:
        """
        Generate or refine tags for a dataset using LLM.
        
        Args:
            table_name: Name of the table
            metadata: Catalog metadata dict
            
        Returns:
            List of tag strings
        """
        columns = metadata.get("columns", [])
        description = metadata.get("semantic_description") or metadata.get("description", "")
        current_tags = metadata.get("tags", [])
        
        prompt = TAG_PROMPT.format(
            table_name=table_name,
            description=description,
            columns=", ".join(columns[:15]),
            current_tags=current_tags
        )
        
        try:
            import json
            response = await self.llm.generate_response(prompt)
            
            # Parse JSON array
            response = response.strip()
            if response.startswith("```"):
                response = response.split("```")[1]
                if response.startswith("json"):
                    response = response[4:]
            
            tags = json.loads(response)
            
            if isinstance(tags, list):
                # Validate and clean tags
                clean_tags = []
                for tag in tags:
                    if isinstance(tag, str):
                        tag = tag.lower().strip()
                        if 2 <= len(tag) <= 30:
                            clean_tags.append(tag)
                
                return clean_tags
            
        except Exception as e:
            logger.error(f"Failed to generate tags for {table_name}: {e}")
        
        return current_tags
    
    async def enrich_table(
        self, 
        table_name: str, 
        metadata: Dict[str, Any],
        sample_values: Optional[Dict[str, str]] = None,
        force_refresh: bool = False
    ) -> Dict[str, Any]:
        """
        Fully enrich a table's metadata with description and tags.
        
        Args:
            table_name: Name of the table
            metadata: Current catalog metadata
            sample_values: Optional sample data for context
            force_refresh: If True, regenerate even if already enriched
            
        Returns:
            Updated metadata dict
        """
        updated = metadata.copy()
        
        # Generate description if missing or forced
        if force_refresh or not metadata.get("semantic_description"):
            logger.info(f"Generating semantic description for {table_name}...")
            description = await self.generate_description(table_name, metadata, sample_values)
            updated["semantic_description"] = description
        
        # Generate/refine tags (always, to improve quality)
        if force_refresh or len(metadata.get("tags", [])) < 3:
            logger.info(f"Generating tags for {table_name}...")
            tags = await self.generate_tags(table_name, updated)
            # Merge with existing, deduplicate
            existing_tags = set(metadata.get("tags", []))
            new_tags = set(tags)
            updated["tags"] = list(existing_tags | new_tags)
        
        return updated


# Singleton accessor
_catalog_enricher: Optional[CatalogEnricher] = None


def get_catalog_enricher() -> CatalogEnricher:
    """Get the singleton catalog enricher instance."""
    global _catalog_enricher
    if _catalog_enricher is None:
        _catalog_enricher = CatalogEnricher()
    return _catalog_enricher