Mutsynchub / app /service /schema_resolver.py
shaliz-kong
Initial commit: self-hosted Redis, DuckDB, Analytics Engine
98a466d
# app/services/schema_resolver.py
from typing import Optional
from app.schemas.org_schema import OrgSchema
from app.service.llm_service import LocalLLMService
import logging
logger = logging.getLogger(__name__)
class SchemaResolver:
"""
Autonomous schema resolution service that learns from your data.
Bridges the gap between raw columns and semantic understanding.
"""
def __init__(self, org_id: str):
self.org_id = org_id
self.schema = OrgSchema(org_id)
self.llm = LocalLLMService()
def resolve_with_certainty(self, semantic_field: str) -> Optional[str]:
"""
Returns column name only if confidence > 95%.
Otherwise triggers AI training workflow.
"""
mapping = self.schema.get_mapping()
column = mapping.get(semantic_field)
if column:
# Verify with LLM for critical fields
if semantic_field in {"total", "timestamp", "transaction_id"}:
return self._verify_critical_field(semantic_field, column)
return column
# No match found - trigger autonomous learning
return self._learn_new_mapping(semantic_field)
def _verify_critical_field(self, semantic: str, candidate: str) -> Optional[str]:
"""LLM verification for business-critical fields"""
try:
prompt = f"""
Verify: Does column '{candidate}' represent '{semantic}'?
Return ONLY 'YES' or 'NO'. Consider business logic and data patterns.
"""
response = self.llm.generate(prompt, max_tokens=5).strip()
return candidate if response == "YES" else None
except:
return candidate
def _learn_new_mapping(self, semantic: str) -> Optional[str]:
"""Autonomous learning from user queries and corrections"""
# This would integrate with your feedback loop
logger.warning(f"[Schema] Need training for: {self.org_id}.{semantic}")
return None