anomaly-detection-api / scripts /model_versioning.py
Senum2001
Deep fix: Add comprehensive state validation before logging training cycles to prevent type errors
68c41e5
"""
Model Versioning & Training History System
Tracks model parameters, thresholds, and training evolution in Supabase
"""
import os
import json
from datetime import datetime
from typing import Dict, Any, Optional, List
from supabase import create_client, Client
import uuid
# Supabase configuration
SUPABASE_URL = os.getenv("SUPABASE_URL", "https://xbcgrpqiibicestnhytt.supabase.co")
SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY", "")
# Initialize Supabase client
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
class ModelVersionTracker:
"""
Tracks model versions, parameters, and training history
"""
def __init__(self):
"""Initialize the model version tracker"""
self.supabase = supabase
def get_current_model_state(self) -> Dict[str, Any]:
"""
Get current model parameters and thresholds
Returns:
Dictionary containing current model state
"""
# Read current model adjustments if they exist
adjustments = {}
if os.path.exists("model_adjustments.json"):
with open("model_adjustments.json", "r") as f:
adjustments = json.load(f)
# Read training state
training_state = {}
if os.path.exists("feedback_training_state.json"):
with open("feedback_training_state.json", "r") as f:
training_state = json.load(f)
# Define current model parameters
model_state = {
"version_id": str(uuid.uuid4()),
"timestamp": datetime.now().isoformat(),
# Model Configuration
"model_architecture": "PatchCore",
"backbone": "Wide ResNet-50",
"layers": ["layer2", "layer3"],
"input_size": [256, 256],
# Detection Thresholds
"anomaly_threshold": 128, # Binary mask threshold
"confidence_range": [0.3, 0.99],
"min_detection_size": 100, # Minimum pixels for detection
# Classification Thresholds
"red_color_threshold": {
"hue_range": [0, 10, 170, 180],
"saturation_min": 100,
"value_min": 100
},
"yellow_color_threshold": {
"hue_range": [20, 30],
"saturation_min": 100,
"value_min": 100
},
"orange_color_threshold": {
"hue_range": [10, 20],
"saturation_min": 100,
"value_min": 100
},
# Post-processing Parameters
"merge_distance_threshold": 20,
"iou_threshold": 0.4,
"min_contour_area": 100,
# Learned Adjustments (from feedback)
"false_positive_rate": adjustments.get("fp_rate", 0.0),
"false_negative_rate": adjustments.get("fn_rate", 0.0),
"threshold_recommendation": adjustments.get("recommendation", "Not yet calculated"),
# Training Metadata
"total_feedback_processed": training_state.get("total_feedback_processed", 0),
"last_training_time": training_state.get("last_training_time"),
"training_runs_count": len(training_state.get("training_runs", []))
}
return model_state
def log_model_version(self, model_state: Dict[str, Any]) -> Optional[str]:
"""
Log current model version to Supabase
Args:
model_state: Dictionary containing model parameters
Returns:
Version ID if successful, None otherwise
"""
try:
# Prepare record for database
record = {
"version_id": model_state["version_id"],
"timestamp": model_state["timestamp"],
"model_architecture": model_state["model_architecture"],
"backbone": model_state["backbone"],
"parameters": {
"layers": model_state["layers"],
"input_size": model_state["input_size"],
"anomaly_threshold": model_state["anomaly_threshold"],
"confidence_range": model_state["confidence_range"],
"min_detection_size": model_state["min_detection_size"]
},
"thresholds": {
"red_color": model_state["red_color_threshold"],
"yellow_color": model_state["yellow_color_threshold"],
"orange_color": model_state["orange_color_threshold"],
"merge_distance": model_state["merge_distance_threshold"],
"iou": model_state["iou_threshold"],
"min_contour_area": model_state["min_contour_area"]
},
"learned_adjustments": {
"false_positive_rate": model_state["false_positive_rate"],
"false_negative_rate": model_state["false_negative_rate"],
"recommendation": model_state["threshold_recommendation"]
},
"training_metadata": {
"total_feedback_processed": model_state["total_feedback_processed"],
"last_training_time": model_state["last_training_time"],
"training_runs_count": model_state["training_runs_count"]
},
"is_active": True
}
# Insert into database
response = self.supabase.table('model_versions').insert(record).execute()
if response.data:
print(f"[Model Versioning] Logged version {model_state['version_id']}")
return model_state["version_id"]
else:
print("[Model Versioning] Failed to log version")
return None
except Exception as e:
# If table doesn't exist, just warn but don't fail
if "does not exist" in str(e) or "PGRST205" in str(e):
print(f"[Model Versioning] Warning: Tables not created yet. Run setup_model_versioning.py to create them.")
print(f"[Model Versioning] Version {model_state['version_id']} tracked locally only.")
else:
print(f"[Model Versioning] Error logging version: {e}")
return None
def log_training_cycle(self,
before_state: Dict[str, Any],
after_state: Dict[str, Any],
feedback_count: int,
patterns: Dict[str, Any],
performance_metrics: Optional[Dict[str, Any]] = None) -> Optional[str]:
"""
Log a training cycle with before/after comparison
Args:
before_state: Model state before training
after_state: Model state after training
feedback_count: Number of feedback samples processed
patterns: Pattern analysis from feedback
performance_metrics: Optional performance metrics
Returns:
Training cycle ID if successful
"""
try:
# Validate inputs - ensure states are dictionaries
if not isinstance(before_state, dict) or not isinstance(after_state, dict):
print(f"[Training History] Error: Invalid state types - before: {type(before_state)}, after: {type(after_state)}")
return None
cycle_id = str(uuid.uuid4())
# Calculate parameter changes
parameter_changes = self._calculate_parameter_changes(before_state, after_state)
record = {
"cycle_id": cycle_id,
"timestamp": datetime.now().isoformat(),
"before_version_id": before_state["version_id"],
"after_version_id": after_state["version_id"],
"feedback_samples_processed": feedback_count,
# Pattern Analysis
"feedback_patterns": {
"label_changes": patterns.get("label_changes", []),
"bbox_adjustments": patterns.get("bbox_adjustments", []),
"false_positives": patterns.get("false_positives", 0),
"false_negatives": patterns.get("false_negatives", 0)
},
# Parameter Changes
"parameter_changes": parameter_changes,
# Performance Metrics (if available)
"performance_metrics": performance_metrics or {
"accuracy_improvement": "Not yet calculated",
"precision_improvement": "Not yet calculated",
"recall_improvement": "Not yet calculated"
},
# Recommendations
"threshold_recommendation": after_state.get("threshold_recommendation", ""),
# Status
"status": "completed",
"notes": f"Processed {feedback_count} feedback samples"
}
# Insert into database
response = self.supabase.table('training_history').insert(record).execute()
if response.data:
print(f"[Training History] Logged cycle {cycle_id}")
return cycle_id
else:
print("[Training History] Failed to log cycle")
return None
except Exception as e:
# If table doesn't exist, just warn but don't fail
if "does not exist" in str(e) or "PGRST205" in str(e):
print(f"[Training History] Warning: Tables not created yet.")
print(f"[Training History] Cycle tracked locally only.")
else:
print(f"[Training History] Error logging cycle: {e}")
return None
def _calculate_parameter_changes(self, before: Dict[str, Any], after: Dict[str, Any]) -> Dict[str, Any]:
"""Calculate what changed between before and after states"""
changes = {}
# Compare false positive/negative rates
if before["false_positive_rate"] != after["false_positive_rate"]:
changes["false_positive_rate"] = {
"before": before["false_positive_rate"],
"after": after["false_positive_rate"],
"delta": after["false_positive_rate"] - before["false_positive_rate"]
}
if before["false_negative_rate"] != after["false_negative_rate"]:
changes["false_negative_rate"] = {
"before": before["false_negative_rate"],
"after": after["false_negative_rate"],
"delta": after["false_negative_rate"] - before["false_negative_rate"]
}
# Compare training metadata
if before["total_feedback_processed"] != after["total_feedback_processed"]:
changes["total_feedback_processed"] = {
"before": before["total_feedback_processed"],
"after": after["total_feedback_processed"],
"delta": after["total_feedback_processed"] - before["total_feedback_processed"]
}
if before["threshold_recommendation"] != after["threshold_recommendation"]:
changes["threshold_recommendation"] = {
"before": before["threshold_recommendation"],
"after": after["threshold_recommendation"]
}
return changes
def get_version_history(self, limit: int = 20) -> List[Dict[str, Any]]:
"""
Get recent model version history
Args:
limit: Maximum number of versions to retrieve
Returns:
List of model versions
"""
try:
response = self.supabase.table('model_versions')\
.select('*')\
.order('timestamp', desc=True)\
.limit(limit)\
.execute()
return response.data if response.data else []
except Exception as e:
print(f"[Model Versioning] Error fetching history: {e}")
return []
def get_training_history(self, limit: int = 20) -> List[Dict[str, Any]]:
"""
Get recent training cycles
Args:
limit: Maximum number of cycles to retrieve
Returns:
List of training cycles
"""
try:
response = self.supabase.table('training_history')\
.select('*')\
.order('timestamp', desc=True)\
.limit(limit)\
.execute()
return response.data if response.data else []
except Exception as e:
print(f"[Training History] Error fetching history: {e}")
return []
def get_active_version(self) -> Optional[Dict[str, Any]]:
"""
Get currently active model version
Returns:
Active model version or None
"""
try:
response = self.supabase.table('model_versions')\
.select('*')\
.eq('is_active', True)\
.order('timestamp', desc=True)\
.limit(1)\
.execute()
if response.data:
return response.data[0]
return None
except Exception as e:
print(f"[Model Versioning] Error fetching active version: {e}")
return None
def generate_comparison_table(self, version_ids: List[str]) -> str:
"""
Generate a comparison table between model versions
Args:
version_ids: List of version IDs to compare
Returns:
Formatted comparison table string
"""
try:
versions = []
for vid in version_ids:
response = self.supabase.table('model_versions')\
.select('*')\
.eq('version_id', vid)\
.execute()
if response.data:
versions.append(response.data[0])
if not versions:
return "No versions found"
# Generate comparison table
table = "\n" + "=" * 100 + "\n"
table += "MODEL VERSION COMPARISON\n"
table += "=" * 100 + "\n\n"
for i, v in enumerate(versions):
table += f"Version {i+1}: {v['version_id'][:8]}...\n"
table += f"Timestamp: {v['timestamp']}\n"
table += f"Architecture: {v['model_architecture']} ({v['backbone']})\n"
table += f"False Positive Rate: {v['learned_adjustments']['false_positive_rate']:.2%}\n"
table += f"False Negative Rate: {v['learned_adjustments']['false_negative_rate']:.2%}\n"
table += f"Feedback Processed: {v['training_metadata']['total_feedback_processed']}\n"
table += f"Recommendation: {v['learned_adjustments']['recommendation']}\n"
table += "-" * 100 + "\n"
return table
except Exception as e:
print(f"[Model Versioning] Error generating comparison: {e}")
return f"Error: {e}"
def initialize_model_tracker():
"""Initialize the model version tracker"""
return ModelVersionTracker()
# SQL for creating the required tables (run in Supabase Dashboard)
CREATE_TABLES_SQL = """
-- Table: model_versions
-- Stores each model version with parameters and thresholds
CREATE TABLE IF NOT EXISTS model_versions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
version_id VARCHAR(255) UNIQUE NOT NULL,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
model_architecture VARCHAR(100) NOT NULL,
backbone VARCHAR(100),
parameters JSONB,
thresholds JSONB,
learned_adjustments JSONB,
training_metadata JSONB,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_model_versions_timestamp ON model_versions(timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_model_versions_active ON model_versions(is_active) WHERE is_active = TRUE;
-- Table: training_history
-- Stores training cycle information with before/after comparisons
CREATE TABLE IF NOT EXISTS training_history (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
cycle_id VARCHAR(255) UNIQUE NOT NULL,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
before_version_id VARCHAR(255),
after_version_id VARCHAR(255),
feedback_samples_processed INTEGER,
feedback_patterns JSONB,
parameter_changes JSONB,
performance_metrics JSONB,
threshold_recommendation TEXT,
status VARCHAR(50),
notes TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_training_history_timestamp ON training_history(timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_training_history_status ON training_history(status);
-- Foreign key constraints
ALTER TABLE training_history
ADD CONSTRAINT fk_before_version
FOREIGN KEY (before_version_id)
REFERENCES model_versions(version_id);
ALTER TABLE training_history
ADD CONSTRAINT fk_after_version
FOREIGN KEY (after_version_id)
REFERENCES model_versions(version_id);
"""