| | |
| | from typing import Optional |
| | from app.schemas.org_schema import OrgSchema |
| | from app.service.llm_service import LocalLLMService |
| | import logging |
| |
|
| | logger = logging.getLogger(__name__) |
| | class SchemaResolver: |
| | """ |
| | Autonomous schema resolution service that learns from your data. |
| | Bridges the gap between raw columns and semantic understanding. |
| | """ |
| | |
| | def __init__(self, org_id: str): |
| | self.org_id = org_id |
| | self.schema = OrgSchema(org_id) |
| | self.llm = LocalLLMService() |
| | |
| | def resolve_with_certainty(self, semantic_field: str) -> Optional[str]: |
| | """ |
| | Returns column name only if confidence > 95%. |
| | Otherwise triggers AI training workflow. |
| | """ |
| | mapping = self.schema.get_mapping() |
| | column = mapping.get(semantic_field) |
| | |
| | if column: |
| | |
| | if semantic_field in {"total", "timestamp", "transaction_id"}: |
| | return self._verify_critical_field(semantic_field, column) |
| | return column |
| | |
| | |
| | return self._learn_new_mapping(semantic_field) |
| | |
| | def _verify_critical_field(self, semantic: str, candidate: str) -> Optional[str]: |
| | """LLM verification for business-critical fields""" |
| | try: |
| | prompt = f""" |
| | Verify: Does column '{candidate}' represent '{semantic}'? |
| | |
| | Return ONLY 'YES' or 'NO'. Consider business logic and data patterns. |
| | """ |
| | response = self.llm.generate(prompt, max_tokens=5).strip() |
| | return candidate if response == "YES" else None |
| | except: |
| | return candidate |
| | |
| | def _learn_new_mapping(self, semantic: str) -> Optional[str]: |
| | """Autonomous learning from user queries and corrections""" |
| | |
| | logger.warning(f"[Schema] Need training for: {self.org_id}.{semantic}") |
| | return None |