|
|
import json |
|
|
import os |
|
|
from openai import OpenAI |
|
|
from json_logic import jsonLogic |
|
|
from loguru import logger |
|
|
from app.constants import MOCK_STORE_SAMPLES |
|
|
from app.models import KeyMapping |
|
|
|
|
|
|
|
|
class RuleGenerationService: |
|
|
""" Generates json logic rules using gpt-4o-mini |
|
|
Uses self-consistency voting to pick best rule from multiple attempts |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
try: |
|
|
self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) |
|
|
self.model = "gpt-4o-mini" |
|
|
logger.success("RuleGenerationService initialized") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to initialize RuleGenerationService: {str(e)}") |
|
|
raise |
|
|
|
|
|
def build_system_prompt(self, available_keys, policy_context): |
|
|
|
|
|
try: |
|
|
keys_str = json.dumps(available_keys, indent=2) |
|
|
|
|
|
system_prompt = f"""You are an expert JSON Logic rule generator for loan application systems. |
|
|
|
|
|
AVAILABLE KEYS (use ONLY these in {{"var": "key"}}): |
|
|
{keys_str} |
|
|
|
|
|
POLICY CONTEXT: |
|
|
{policy_context} |
|
|
|
|
|
JSON LOGIC OPERATORS: |
|
|
- Logical: "and", "or", "!", "if" |
|
|
- Comparison: ">", "<", ">=", "<=", "==", "!=" |
|
|
- Arrays: "in", "some", "all" |
|
|
- Math: "+", "-", "*", "/" |
|
|
|
|
|
RULES: |
|
|
1. Use ONLY the available keys listed above |
|
|
2. All keys must be referenced using {{"var": "key.name"}} |
|
|
3. Generate valid JSON Logic syntax |
|
|
4. Be precise with thresholds from policies |
|
|
5. Use "and" for multiple conditions, "or" for alternatives |
|
|
|
|
|
OUTPUT FORMAT (must be valid JSON): |
|
|
{{ |
|
|
"json_logic": {{"and": [...]}}, |
|
|
"explanation": "Brief 1-2 sentence explanation", |
|
|
"used_keys": ["key1", "key2"], |
|
|
"confidence": 0.0-1.0 |
|
|
}} |
|
|
|
|
|
EXAMPLES: |
|
|
|
|
|
User: "Approve if bureau score > 700" |
|
|
Output: |
|
|
{{ |
|
|
"json_logic": {{">": [{{"var": "bureau.score"}}, 700]}}, |
|
|
"explanation": "Approves applications where bureau score exceeds 700.", |
|
|
"used_keys": ["bureau.score"], |
|
|
"confidence": 0.95 |
|
|
}} |
|
|
|
|
|
User: "Reject if wilful default OR suit filed" |
|
|
Output: |
|
|
{{ |
|
|
"json_logic": {{"or": [ |
|
|
{{"==": [{{"var": "bureau.wilful_default"}}, true]}}, |
|
|
{{"==": [{{"var": "bureau.suit_filed"}}, true]}} |
|
|
]}}, |
|
|
"explanation": "Rejects applications with wilful default or suit filed status.", |
|
|
"used_keys": ["bureau.wilful_default", "bureau.suit_filed"], |
|
|
"confidence": 0.92 |
|
|
}} |
|
|
|
|
|
User: "Approve if age between 25 and 60" |
|
|
Output: |
|
|
{{ |
|
|
"json_logic": {{"and": [ |
|
|
{{">=": [{{"var": "primary_applicant.age"}}, 25]}}, |
|
|
{{"<=": [{{"var": "primary_applicant.age"}}, 60]}} |
|
|
]}}, |
|
|
"explanation": "Approves when primary applicant age is between 25 and 60 years inclusive.", |
|
|
"used_keys": ["primary_applicant.age"], |
|
|
"confidence": 0.90 |
|
|
}}""" |
|
|
|
|
|
return system_prompt |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to build system prompt: {str(e)}") |
|
|
return "" |
|
|
|
|
|
def generate_single_rule(self, prompt, system_prompt, temperature=0.2): |
|
|
|
|
|
|
|
|
try: |
|
|
response = self.client.chat.completions.create( |
|
|
model=self.model, |
|
|
messages=[ |
|
|
{"role": "system", "content": system_prompt}, |
|
|
{"role": "user", "content": prompt} |
|
|
], |
|
|
temperature=temperature, |
|
|
max_tokens=800, |
|
|
response_format={"type": "json_object"} |
|
|
) |
|
|
|
|
|
content = response.choices[0].message.content |
|
|
result = json.loads(content) |
|
|
|
|
|
|
|
|
if not all(k in result for k in ['json_logic', 'explanation', 'used_keys']): |
|
|
logger.warning("LLM response missing required fields") |
|
|
raise ValueError("Missing required fields in LLM response") |
|
|
|
|
|
|
|
|
if 'confidence' not in result: |
|
|
result['confidence'] = 0.8 |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Rule generation failed: {str(e)}") |
|
|
return None |
|
|
|
|
|
def validate_rule(self, rule, available_keys): |
|
|
|
|
|
|
|
|
try: |
|
|
if not rule: |
|
|
return False, "Empty rule" |
|
|
|
|
|
|
|
|
def extract_vars(obj): |
|
|
vars_found = [] |
|
|
if isinstance(obj, dict): |
|
|
if "var" in obj: |
|
|
vars_found.append(obj["var"]) |
|
|
for value in obj.values(): |
|
|
vars_found.extend(extract_vars(value)) |
|
|
elif isinstance(obj, list): |
|
|
for item in obj: |
|
|
vars_found.extend(extract_vars(item)) |
|
|
return vars_found |
|
|
|
|
|
used_vars = extract_vars(rule) |
|
|
|
|
|
|
|
|
invalid_vars = [v for v in used_vars if v not in available_keys] |
|
|
if invalid_vars: |
|
|
logger.warning(f"Rule uses invalid keys: {invalid_vars}") |
|
|
return False, f"Invalid keys used: {invalid_vars}" |
|
|
|
|
|
return True, "" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Rule validation failed: {str(e)}") |
|
|
return False, str(e) |
|
|
|
|
|
def test_rule_on_mocks(self, rule, num_samples=5): |
|
|
|
|
|
|
|
|
try: |
|
|
if not rule: |
|
|
return 0.0 |
|
|
|
|
|
successes = 0 |
|
|
samples = MOCK_STORE_SAMPLES[:num_samples] |
|
|
|
|
|
for sample in samples: |
|
|
try: |
|
|
|
|
|
result = jsonLogic(rule, sample) |
|
|
successes += 1 |
|
|
except Exception as e: |
|
|
|
|
|
logger.debug(f"Rule test failed on sample: {str(e)}") |
|
|
continue |
|
|
|
|
|
success_rate = successes / len(samples) if samples else 0.0 |
|
|
return success_rate |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Mock testing failed: {str(e)}") |
|
|
return 0.0 |
|
|
|
|
|
def self_consistency_vote(self, variants): |
|
|
|
|
|
|
|
|
try: |
|
|
if not variants: |
|
|
return None |
|
|
|
|
|
if len(variants) == 1: |
|
|
return variants[0] |
|
|
|
|
|
scored_variants = [] |
|
|
for variant in variants: |
|
|
score = 0.0 |
|
|
|
|
|
|
|
|
score += variant.get('confidence', 0.5) * 0.4 |
|
|
|
|
|
|
|
|
validation_rate = variant.get('validation_rate', 0.0) |
|
|
score += validation_rate * 0.4 |
|
|
|
|
|
|
|
|
rule_str = json.dumps(variant['json_logic']) |
|
|
complexity_penalty = min(len(rule_str) / 500, 0.2) |
|
|
score += (0.2 - complexity_penalty) |
|
|
|
|
|
scored_variants.append((score, variant)) |
|
|
|
|
|
|
|
|
scored_variants.sort(key=lambda x: x[0], reverse=True) |
|
|
|
|
|
scores_str = [f'{s:.3f}' for s, _ in scored_variants] |
|
|
logger.debug(f"Self-consistency scores: {scores_str}") |
|
|
|
|
|
return scored_variants[0][1] |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Self-consistency voting failed: {str(e)}") |
|
|
return variants[0] if variants else None |
|
|
|
|
|
def generate_rule(self, prompt, key_mappings, policy_context, num_variants=3): |
|
|
""" |
|
|
Main method - generates rule with self-consistency |
|
|
tries multiple times with different temperatures and picks best |
|
|
""" |
|
|
try: |
|
|
logger.info(f"Generating {num_variants} rule variants...") |
|
|
|
|
|
|
|
|
available_keys = [m.mapped_to for m in key_mappings] |
|
|
|
|
|
|
|
|
system_prompt = self.build_system_prompt(available_keys, policy_context) |
|
|
|
|
|
|
|
|
variants = [] |
|
|
temperatures = [0.1, 0.3, 0.5][:num_variants] |
|
|
|
|
|
for i, temp in enumerate(temperatures): |
|
|
logger.debug(f"Generating variant {i+1} with temp={temp}...") |
|
|
|
|
|
result = self.generate_single_rule(prompt, system_prompt, temperature=temp) |
|
|
|
|
|
if result: |
|
|
|
|
|
is_valid, error_msg = self.validate_rule( |
|
|
result['json_logic'], |
|
|
available_keys |
|
|
) |
|
|
|
|
|
if not is_valid: |
|
|
logger.warning(f"Variant {i+1} validation failed: {error_msg}") |
|
|
continue |
|
|
|
|
|
|
|
|
validation_rate = self.test_rule_on_mocks(result['json_logic']) |
|
|
result['validation_rate'] = validation_rate |
|
|
|
|
|
logger.info(f"Variant {i+1}: conf={result['confidence']:.3f}, val={validation_rate:.3f}") |
|
|
|
|
|
variants.append(result) |
|
|
|
|
|
if not variants: |
|
|
logger.error("Failed to generate any valid rule variants") |
|
|
raise ValueError("Failed to generate any valid rule variants") |
|
|
|
|
|
|
|
|
best_rule = self.self_consistency_vote(variants) |
|
|
|
|
|
logger.success(f"Selected best rule") |
|
|
|
|
|
return best_rule |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Rule generation failed: {str(e)}") |
|
|
raise |
|
|
|
|
|
def calculate_confidence_score(self, rule_result, key_mappings, policy_relevance): |
|
|
"""Calculate overall confidence from multiple factors""" |
|
|
try: |
|
|
|
|
|
avg_key_sim = sum(m.similarity for m in key_mappings) / len(key_mappings) if key_mappings else 0.0 |
|
|
|
|
|
|
|
|
policy_score = policy_relevance |
|
|
|
|
|
|
|
|
llm_confidence = rule_result.get('confidence', 0.8) |
|
|
validation_rate = rule_result.get('validation_rate', 0.8) |
|
|
generation_score = (llm_confidence + validation_rate) / 2 |
|
|
|
|
|
|
|
|
confidence = ( |
|
|
avg_key_sim * 0.4 + |
|
|
policy_score * 0.3 + |
|
|
generation_score * 0.3 |
|
|
) |
|
|
|
|
|
|
|
|
return float(min(max(confidence, 0.0), 1.0)) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Confidence calculation failed: {str(e)}") |
|
|
return 0.5 |
|
|
|