Spaces:
Paused
Paused
| # app/services/schema_resolver.py | |
| from typing import Optional | |
| from app.schemas.org_schema import OrgSchema | |
| from app.service.llm_service import LocalLLMService | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class SchemaResolver: | |
| """ | |
| Autonomous schema resolution service that learns from your data. | |
| Bridges the gap between raw columns and semantic understanding. | |
| """ | |
| def __init__(self, org_id: str): | |
| self.org_id = org_id | |
| self.schema = OrgSchema(org_id) | |
| self.llm = LocalLLMService() | |
| def resolve_with_certainty(self, semantic_field: str) -> Optional[str]: | |
| """ | |
| Returns column name only if confidence > 95%. | |
| Otherwise triggers AI training workflow. | |
| """ | |
| mapping = self.schema.get_mapping() | |
| column = mapping.get(semantic_field) | |
| if column: | |
| # Verify with LLM for critical fields | |
| if semantic_field in {"total", "timestamp", "transaction_id"}: | |
| return self._verify_critical_field(semantic_field, column) | |
| return column | |
| # No match found - trigger autonomous learning | |
| return self._learn_new_mapping(semantic_field) | |
| def _verify_critical_field(self, semantic: str, candidate: str) -> Optional[str]: | |
| """LLM verification for business-critical fields""" | |
| try: | |
| prompt = f""" | |
| Verify: Does column '{candidate}' represent '{semantic}'? | |
| Return ONLY 'YES' or 'NO'. Consider business logic and data patterns. | |
| """ | |
| response = self.llm.generate(prompt, max_tokens=5).strip() | |
| return candidate if response == "YES" else None | |
| except: | |
| return candidate | |
| def _learn_new_mapping(self, semantic: str) -> Optional[str]: | |
| """Autonomous learning from user queries and corrections""" | |
| # This would integrate with your feedback loop | |
| logger.warning(f"[Schema] Need training for: {self.org_id}.{semantic}") | |
| return None |