| """ |
| Data Catalog Service |
| |
| Manages metadata for all datasets available in the platform. |
| Supports semantic search integration for scalable discovery. |
| """ |
|
|
| import json |
| import duckdb |
| import logging |
| from datetime import datetime |
| from pathlib import Path |
| from typing import List, Dict, Any, Optional |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| TAG_RULES = { |
| |
| "health": ["health", "facilities", "infrastructure"], |
| "hospital": ["health", "facilities", "medical"], |
| "clinic": ["health", "facilities", "medical"], |
| "school": ["education", "facilities", "infrastructure"], |
| "university": ["education", "facilities", "higher-education"], |
| "education": ["education", "facilities"], |
| "road": ["transportation", "infrastructure", "roads"], |
| "street": ["transportation", "infrastructure", "roads"], |
| "highway": ["transportation", "infrastructure", "roads"], |
| "airport": ["transportation", "infrastructure", "aviation"], |
| "port": ["transportation", "infrastructure", "maritime"], |
| "population": ["demographics", "census", "population"], |
| "census": ["demographics", "census", "statistics"], |
| "admin": ["administrative", "boundaries", "government"], |
| "district": ["administrative", "boundaries"], |
| "province": ["administrative", "boundaries"], |
| "corregimiento": ["administrative", "boundaries"], |
| "park": ["recreation", "green-space", "amenities"], |
| "water": ["hydrology", "natural-resources"], |
| "river": ["hydrology", "water"], |
| "forest": ["environment", "natural-resources", "land-cover"], |
| "building": ["infrastructure", "built-environment"], |
| "poi": ["points-of-interest", "amenities"], |
| } |
|
|
|
|
| class DataCatalog: |
| """ |
| Singleton service managing dataset metadata. |
| |
| Features: |
| - Auto-discovery of GeoJSON files in data directories |
| - Schema inference from first record |
| - Auto-tagging based on naming conventions |
| - Integration with semantic search for scalable discovery |
| """ |
| |
| _instance = None |
| |
| DATA_DIR = Path(__file__).parent.parent / "data" |
| CATALOG_FILE = DATA_DIR / "catalog.json" |
| |
| def __new__(cls): |
| if cls._instance is None: |
| cls._instance = super(DataCatalog, cls).__new__(cls) |
| cls._instance.initialized = False |
| return cls._instance |
|
|
| def __init__(self): |
| if self.initialized: |
| return |
| |
| self.catalog: Dict[str, Any] = {} |
| self.load_catalog() |
| self.scan_and_update() |
| self._init_semantic_search() |
| self.initialized = True |
|
|
| def load_catalog(self): |
| """Load catalog from JSON file.""" |
| if self.CATALOG_FILE.exists(): |
| try: |
| with open(self.CATALOG_FILE, 'r') as f: |
| self.catalog = json.load(f) |
| except Exception as e: |
| logger.error(f"Failed to load catalog: {e}") |
| self.catalog = {} |
| else: |
| self.catalog = {} |
|
|
| def save_catalog(self): |
| """Save catalog to JSON file.""" |
| try: |
| with open(self.CATALOG_FILE, 'w') as f: |
| json.dump(self.catalog, f, indent=2) |
| except Exception as e: |
| logger.error(f"Failed to save catalog: {e}") |
|
|
| def _infer_tags(self, table_name: str, columns: List[str]) -> List[str]: |
| """Auto-generate tags based on table name and columns.""" |
| tags = set() |
| name_lower = table_name.lower() |
| |
| |
| for keyword, keyword_tags in TAG_RULES.items(): |
| if keyword in name_lower: |
| tags.update(keyword_tags) |
| |
| |
| columns_lower = [c.lower() for c in columns] |
| if any('pop' in c for c in columns_lower): |
| tags.add("population") |
| if any('area' in c for c in columns_lower): |
| tags.add("geographic") |
| if 'geom' in columns_lower or 'geometry' in columns_lower: |
| tags.add("spatial") |
| |
| return list(tags) |
|
|
| def _infer_data_type(self, category: str, table_name: str) -> str: |
| """Infer data type (static, semi-static, realtime).""" |
| |
| if category == "base": |
| return "static" |
| |
| |
| if category == "osm": |
| return "semi-static" |
| |
| |
| if category == "hdx": |
| return "semi-static" |
| |
| |
| if "census" in table_name.lower(): |
| return "static" |
| |
| return "static" |
|
|
| def scan_and_update(self): |
| """Scan data directories and update catalog with new files.""" |
| logger.info("Scanning data directories...") |
| |
| |
| subdirs = ['base', 'osm', 'inec', 'hdx', 'custom', 'overture', 'ms_buildings'] |
| |
| |
| con = duckdb.connect(':memory:') |
| con.install_extension('spatial') |
| con.load_extension('spatial') |
| |
| updated = False |
| |
| for subdir in subdirs: |
| dir_path = self.DATA_DIR / subdir |
| if not dir_path.exists(): |
| continue |
| |
| |
| for file_path in list(dir_path.glob('**/*.geojson')) + list(dir_path.glob('**/*.geojson.gz')): |
| table_name = file_path.name.replace('.geojson.gz', '').replace('.geojson', '').lower().replace('-', '_').replace(' ', '_') |
| |
| |
| existing = self.catalog.get(table_name) |
| rel_path = str(file_path.relative_to(self.DATA_DIR)) |
| |
| if existing and existing.get('path') == rel_path: |
| |
| if 'tags' in existing and 'data_type' in existing: |
| continue |
| |
| try: |
| logger.info(f"Indexing {table_name}...") |
| |
| |
| query = f"SELECT * FROM ST_Read('{file_path}') LIMIT 1" |
| df = con.execute(query).fetchdf() |
| columns = list(df.columns) |
| |
| |
| row_count_query = f"SELECT COUNT(*) FROM ST_Read('{file_path}')" |
| row_count = con.execute(row_count_query).fetchone()[0] |
| |
| |
| tags = self._infer_tags(table_name, columns) |
| |
| |
| data_type = self._infer_data_type(subdir, table_name) |
| |
| |
| self.catalog[table_name] = { |
| "path": rel_path, |
| "description": f"Data from {subdir}/{file_path.name}", |
| "semantic_description": None, |
| "tags": tags, |
| "data_type": data_type, |
| "update_frequency": None, |
| "columns": columns, |
| "row_count": row_count, |
| "category": subdir, |
| "format": "geojson", |
| "last_indexed": datetime.now().isoformat() |
| } |
| updated = True |
| |
| except Exception as e: |
| logger.warning(f"Failed to index {file_path}: {e}") |
| |
| con.close() |
| |
| if updated: |
| self.save_catalog() |
| logger.info("Catalog updated.") |
|
|
| def _init_semantic_search(self): |
| """Initialize semantic search with current catalog.""" |
| try: |
| from backend.core.semantic_search import get_semantic_search |
| semantic = get_semantic_search() |
| |
| |
| new_embeddings = semantic.embed_all_tables(self.catalog) |
| if new_embeddings > 0: |
| logger.info(f"Created {new_embeddings} new semantic embeddings.") |
| except Exception as e: |
| logger.warning(f"Semantic search initialization failed: {e}") |
|
|
| def get_table_metadata(self, table_name: str) -> Optional[Dict]: |
| """Get metadata for a specific table.""" |
| return self.catalog.get(table_name) |
|
|
| def get_all_table_summaries(self) -> str: |
| """ |
| Returns a concise summary of all tables. |
| |
| WARNING: This can be very large with many datasets. |
| Prefer using semantic_search.search() for discovery. |
| """ |
| summary = "Available Data Tables:\n" |
| |
| |
| by_category: Dict[str, List] = {} |
| for name, meta in self.catalog.items(): |
| cat = meta.get('category', 'other') |
| if cat not in by_category: |
| by_category[cat] = [] |
| by_category[cat].append((name, meta)) |
| |
| for cat, items in by_category.items(): |
| summary += f"\n## {cat.upper()}\n" |
| for name, meta in items: |
| desc = meta.get('semantic_description') or meta.get('description', 'No description') |
| tags = meta.get('tags', []) |
| tag_str = f" [{', '.join(tags[:3])}]" if tags else "" |
| summary += f"- {name}: {desc}{tag_str}\n" |
| |
| return summary |
|
|
| def get_summaries_for_tables(self, table_names: List[str]) -> str: |
| """ |
| Get summaries only for specified tables. |
| |
| Used after semantic pre-filtering to build focused LLM context. |
| """ |
| summary = "Relevant Data Tables:\n\n" |
| |
| for name in table_names: |
| meta = self.catalog.get(name) |
| if not meta: |
| continue |
| |
| desc = meta.get('semantic_description') or meta.get('description', 'No description') |
| tags = meta.get('tags', []) |
| columns = meta.get('columns', [])[:10] |
| row_count = meta.get('row_count', 'unknown') |
| |
| summary += f"### {name}\n" |
| summary += f"Description: {desc}\n" |
| if tags: |
| summary += f"Tags: {', '.join(tags)}\n" |
| summary += f"Columns: {', '.join(columns)}\n" |
| summary += f"Rows: {row_count}\n\n" |
| |
| return summary |
|
|
| def get_specific_table_schemas(self, table_names: List[str]) -> str: |
| """Returns detailed schema for specific tables.""" |
| output = "" |
| for name in table_names: |
| meta = self.catalog.get(name) |
| if not meta: |
| continue |
| |
| output += f"### {name}\n" |
| output += f"Description: {meta.get('description')}\n" |
| output += "Columns: " + ", ".join(meta.get('columns', [])) + "\n\n" |
| return output |
|
|
| def get_file_path(self, table_name: str) -> Optional[Path]: |
| """Get absolute path for a table's data file.""" |
| meta = self.catalog.get(table_name) |
| if meta and 'path' in meta: |
| return self.DATA_DIR / meta['path'] |
| return None |
|
|
| def get_tables_by_tag(self, tag: str) -> List[str]: |
| """Get all table names that have a specific tag.""" |
| return [ |
| name for name, meta in self.catalog.items() |
| if tag in meta.get('tags', []) |
| ] |
|
|
| def get_tables_by_category(self, category: str) -> List[str]: |
| """Get all table names in a specific category.""" |
| return [ |
| name for name, meta in self.catalog.items() |
| if meta.get('category') == category |
| ] |
|
|
| def get_stats(self) -> dict: |
| """Return statistics about the catalog.""" |
| categories = {} |
| tags = {} |
| enriched_count = 0 |
| |
| for meta in self.catalog.values(): |
| cat = meta.get('category', 'other') |
| categories[cat] = categories.get(cat, 0) + 1 |
| |
| if meta.get('semantic_description'): |
| enriched_count += 1 |
| |
| for tag in meta.get('tags', []): |
| tags[tag] = tags.get(tag, 0) + 1 |
| |
| return { |
| "total_datasets": len(self.catalog), |
| "enriched_datasets": enriched_count, |
| "by_category": categories, |
| "by_tag": dict(sorted(tags.items(), key=lambda x: -x[1])[:20]), |
| "catalog_file": str(self.CATALOG_FILE) |
| } |
|
|
| async def enrich_table(self, table_name: str, force_refresh: bool = False) -> bool: |
| """ |
| Enrich a single table with LLM-generated metadata. |
| |
| Returns True if enrichment was successful. |
| """ |
| if table_name not in self.catalog: |
| logger.warning(f"Table {table_name} not found in catalog") |
| return False |
| |
| metadata = self.catalog[table_name] |
| |
| |
| if not force_refresh and metadata.get('semantic_description'): |
| logger.info(f"Table {table_name} already enriched, skipping") |
| return True |
| |
| try: |
| from backend.core.catalog_enricher import get_catalog_enricher |
| enricher = get_catalog_enricher() |
| |
| |
| sample_values = await self._get_sample_values(table_name) |
| |
| |
| enriched = await enricher.enrich_table(table_name, metadata, sample_values, force_refresh) |
| |
| |
| enriched['last_enriched'] = datetime.now().isoformat() |
| self.catalog[table_name] = enriched |
| self.save_catalog() |
| |
| |
| self._update_embedding(table_name, enriched) |
| |
| logger.info(f"Successfully enriched {table_name}") |
| return True |
| |
| except Exception as e: |
| logger.error(f"Failed to enrich {table_name}: {e}") |
| return False |
|
|
| async def enrich_all_tables(self, force_refresh: bool = False) -> Dict[str, bool]: |
| """ |
| Enrich all tables in the catalog. |
| |
| Returns dict of table_name -> success status. |
| """ |
| results = {} |
| |
| for table_name in self.catalog.keys(): |
| success = await self.enrich_table(table_name, force_refresh) |
| results[table_name] = success |
| |
| return results |
|
|
| async def _get_sample_values(self, table_name: str) -> Optional[Dict[str, str]]: |
| """Get sample values from a table for enrichment context.""" |
| try: |
| from backend.core.geo_engine import get_geo_engine |
| geo_engine = get_geo_engine() |
| |
| |
| geo_engine.ensure_table_loaded(table_name) |
| |
| |
| result = geo_engine.con.execute(f"SELECT * FROM {table_name} LIMIT 1").fetchdf() |
| |
| if len(result) > 0: |
| sample = {} |
| for col in result.columns: |
| if col != 'geom': |
| val = result[col].iloc[0] |
| if val is not None: |
| sample[col] = str(val)[:50] |
| return sample |
| |
| except Exception as e: |
| logger.debug(f"Could not get sample values for {table_name}: {e}") |
| |
| return None |
|
|
| def _update_embedding(self, table_name: str, metadata: Dict[str, Any]) -> None: |
| """Update semantic search embedding for a table.""" |
| try: |
| from backend.core.semantic_search import get_semantic_search |
| semantic = get_semantic_search() |
| semantic.embed_table(table_name, metadata) |
| semantic._save_embeddings() |
| except Exception as e: |
| logger.warning(f"Could not update embedding for {table_name}: {e}") |
|
|
|
|
| _data_catalog = None |
|
|
|
|
| def get_data_catalog() -> DataCatalog: |
| """Get the singleton data catalog instance.""" |
| global _data_catalog |
| if _data_catalog is None: |
| _data_catalog = DataCatalog() |
| return _data_catalog |
|
|