File size: 2,045 Bytes
98a466d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# app/services/schema_resolver.py
from typing import Optional
from app.schemas.org_schema import OrgSchema
from app.service.llm_service import LocalLLMService
import logging

logger = logging.getLogger(__name__)
class SchemaResolver:
    """
    Autonomous schema resolution service that learns from your data.
    Bridges the gap between raw columns and semantic understanding.
    """
    
    def __init__(self, org_id: str):
        self.org_id = org_id
        self.schema = OrgSchema(org_id)
        self.llm = LocalLLMService()
    
    def resolve_with_certainty(self, semantic_field: str) -> Optional[str]:
        """
        Returns column name only if confidence > 95%.
        Otherwise triggers AI training workflow.
        """
        mapping = self.schema.get_mapping()
        column = mapping.get(semantic_field)
        
        if column:
            # Verify with LLM for critical fields
            if semantic_field in {"total", "timestamp", "transaction_id"}:
                return self._verify_critical_field(semantic_field, column)
            return column
        
        # No match found - trigger autonomous learning
        return self._learn_new_mapping(semantic_field)
    
    def _verify_critical_field(self, semantic: str, candidate: str) -> Optional[str]:
        """LLM verification for business-critical fields"""
        try:
            prompt = f"""
            Verify: Does column '{candidate}' represent '{semantic}'?
            
            Return ONLY 'YES' or 'NO'. Consider business logic and data patterns.
            """
            response = self.llm.generate(prompt, max_tokens=5).strip()
            return candidate if response == "YES" else None
        except:
            return candidate
    
    def _learn_new_mapping(self, semantic: str) -> Optional[str]:
        """Autonomous learning from user queries and corrections"""
        # This would integrate with your feedback loop
        logger.warning(f"[Schema] Need training for: {self.org_id}.{semantic}")
        return None