Spaces:
Sleeping
Sleeping
| """ | |
| Database Module for NAVADA - SQLite storage for faces and objects | |
| Handles storage, retrieval, and management of custom recognition data | |
| """ | |
| import sqlite3 | |
| import numpy as np | |
| import cv2 | |
| from datetime import datetime | |
| import json | |
| import base64 | |
| from typing import List, Dict, Optional, Tuple | |
| import logging | |
| from pathlib import Path | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| class NAVADADatabase: | |
| """Database manager for storing faces, objects, and recognition data""" | |
| def __init__(self, db_path: str = "navada_recognition.db"): | |
| """ | |
| Initialize database connection and create tables | |
| Args: | |
| db_path: Path to SQLite database file | |
| """ | |
| self.db_path = db_path | |
| self.init_database() | |
| def init_database(self): | |
| """Create database tables if they don't exist""" | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| # Create faces table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS faces ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT NOT NULL, | |
| encoding BLOB NOT NULL, | |
| image_data BLOB, | |
| confidence REAL DEFAULT 0.0, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| metadata TEXT, | |
| is_active BOOLEAN DEFAULT 1 | |
| ) | |
| """) | |
| # Create objects table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS objects ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| label TEXT NOT NULL, | |
| category TEXT, | |
| features BLOB, | |
| image_data BLOB, | |
| bounding_box TEXT, | |
| confidence REAL DEFAULT 0.0, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| metadata TEXT, | |
| is_active BOOLEAN DEFAULT 1 | |
| ) | |
| """) | |
| # Create detection history table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS detection_history ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| session_id TEXT, | |
| image_data BLOB, | |
| detections TEXT, | |
| face_matches TEXT, | |
| object_matches TEXT, | |
| confidence_scores TEXT, | |
| processing_time REAL, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| metadata TEXT | |
| ) | |
| """) | |
| # Create knowledge base for RAG | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS knowledge_base ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| entity_type TEXT NOT NULL, | |
| entity_id INTEGER NOT NULL, | |
| content TEXT NOT NULL, | |
| embedding BLOB, | |
| keywords TEXT, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| # Create training corrections table for active learning | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS training_corrections ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| image_path TEXT, | |
| image_crop BLOB NOT NULL, | |
| bbox_coords TEXT NOT NULL, | |
| yolo_prediction TEXT NOT NULL, | |
| yolo_confidence REAL NOT NULL, | |
| correct_label TEXT NOT NULL, | |
| user_feedback TEXT, | |
| difficulty_score REAL DEFAULT 0.0, | |
| validated BOOLEAN DEFAULT 0, | |
| used_for_training BOOLEAN DEFAULT 0, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| session_id TEXT, | |
| metadata TEXT | |
| ) | |
| """) | |
| # Create custom model versions table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS model_versions ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| version_name TEXT NOT NULL UNIQUE, | |
| model_path TEXT NOT NULL, | |
| accuracy REAL, | |
| precision_score REAL, | |
| recall_score REAL, | |
| f1_score REAL, | |
| training_samples INTEGER DEFAULT 0, | |
| validation_samples INTEGER DEFAULT 0, | |
| training_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| is_active BOOLEAN DEFAULT 0, | |
| performance_metrics TEXT, | |
| training_config TEXT, | |
| notes TEXT | |
| ) | |
| """) | |
| # Create custom classes mapping | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS custom_classes ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| class_name TEXT NOT NULL UNIQUE, | |
| yolo_class TEXT, | |
| sample_count INTEGER DEFAULT 0, | |
| confidence_threshold REAL DEFAULT 0.5, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| is_active BOOLEAN DEFAULT 1, | |
| description TEXT | |
| ) | |
| """) | |
| # Create indexes for better performance | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_faces_name ON faces(name)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_objects_label ON objects(label)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_history_session ON detection_history(session_id)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_knowledge_entity ON knowledge_base(entity_type, entity_id)") | |
| conn.commit() | |
| logger.info("Database initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Database initialization failed: {e}") | |
| raise | |
| def add_face(self, name: str, face_encoding: np.ndarray, image: np.ndarray, | |
| confidence: float = 0.0, metadata: Dict = None) -> int: | |
| """ | |
| Add a new face to the database | |
| Args: | |
| name: Person's name | |
| face_encoding: Face encoding vector | |
| image: Face image array | |
| confidence: Recognition confidence | |
| metadata: Additional metadata | |
| Returns: | |
| Face ID in database | |
| """ | |
| try: | |
| # Encode image to base64 | |
| _, buffer = cv2.imencode('.jpg', image) | |
| image_data = base64.b64encode(buffer).decode('utf-8') | |
| # Serialize face encoding | |
| encoding_data = face_encoding.tobytes() | |
| # Convert metadata to JSON | |
| metadata_json = json.dumps(metadata) if metadata else None | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO faces (name, encoding, image_data, confidence, metadata) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, (name, encoding_data, image_data, confidence, metadata_json)) | |
| face_id = cursor.lastrowid | |
| conn.commit() | |
| # Add to knowledge base | |
| self.add_knowledge_entry("face", face_id, f"Person named {name}") | |
| logger.info(f"Added face for {name} with ID {face_id}") | |
| return face_id | |
| except Exception as e: | |
| logger.error(f"Failed to add face: {e}") | |
| raise | |
| def add_object(self, label: str, category: str, features: np.ndarray, | |
| image: np.ndarray, bounding_box: Tuple, confidence: float = 0.0, | |
| metadata: Dict = None) -> int: | |
| """ | |
| Add a new custom object to the database | |
| Args: | |
| label: Object label/name | |
| category: Object category | |
| features: Feature vector | |
| image: Object image | |
| bounding_box: (x, y, w, h) bounding box | |
| confidence: Detection confidence | |
| metadata: Additional metadata | |
| Returns: | |
| Object ID in database | |
| """ | |
| try: | |
| # Encode image to base64 | |
| _, buffer = cv2.imencode('.jpg', image) | |
| image_data = base64.b64encode(buffer).decode('utf-8') | |
| # Serialize features | |
| features_data = features.tobytes() if features is not None else None | |
| # Serialize bounding box | |
| bbox_json = json.dumps(bounding_box) | |
| # Convert metadata to JSON | |
| metadata_json = json.dumps(metadata) if metadata else None | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO objects (label, category, features, image_data, | |
| bounding_box, confidence, metadata) | |
| VALUES (?, ?, ?, ?, ?, ?, ?) | |
| """, (label, category, features_data, image_data, bbox_json, | |
| confidence, metadata_json)) | |
| object_id = cursor.lastrowid | |
| conn.commit() | |
| # Add to knowledge base | |
| self.add_knowledge_entry("object", object_id, | |
| f"{label} - {category} object") | |
| logger.info(f"Added object {label} with ID {object_id}") | |
| return object_id | |
| except Exception as e: | |
| logger.error(f"Failed to add object: {e}") | |
| raise | |
| def get_faces(self, active_only: bool = True) -> List[Dict]: | |
| """Get all faces from database""" | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| query = "SELECT * FROM faces" | |
| if active_only: | |
| query += " WHERE is_active = 1" | |
| cursor.execute(query) | |
| rows = cursor.fetchall() | |
| faces = [] | |
| for row in rows: | |
| face = { | |
| 'id': row[0], | |
| 'name': row[1], | |
| 'encoding': np.frombuffer(row[2], dtype=np.float64), | |
| 'confidence': row[4], | |
| 'created_at': row[5], | |
| 'metadata': json.loads(row[7]) if row[7] else {} | |
| } | |
| faces.append(face) | |
| return faces | |
| except Exception as e: | |
| logger.error(f"Failed to get faces: {e}") | |
| return [] | |
| def get_objects(self, category: str = None, active_only: bool = True) -> List[Dict]: | |
| """Get objects from database""" | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| query = "SELECT * FROM objects" | |
| params = [] | |
| conditions = [] | |
| if active_only: | |
| conditions.append("is_active = 1") | |
| if category: | |
| conditions.append("category = ?") | |
| params.append(category) | |
| if conditions: | |
| query += " WHERE " + " AND ".join(conditions) | |
| cursor.execute(query, params) | |
| rows = cursor.fetchall() | |
| objects = [] | |
| for row in rows: | |
| obj = { | |
| 'id': row[0], | |
| 'label': row[1], | |
| 'category': row[2], | |
| 'features': np.frombuffer(row[3], dtype=np.float64) if row[3] else None, | |
| 'bounding_box': json.loads(row[5]) if row[5] else None, | |
| 'confidence': row[6], | |
| 'created_at': row[7], | |
| 'metadata': json.loads(row[9]) if row[9] else {} | |
| } | |
| objects.append(obj) | |
| return objects | |
| except Exception as e: | |
| logger.error(f"Failed to get objects: {e}") | |
| return [] | |
| def save_detection_history(self, session_id: str, image: np.ndarray, | |
| detections: List, face_matches: List = None, | |
| object_matches: List = None, confidence_scores: Dict = None, | |
| processing_time: float = 0.0, metadata: Dict = None) -> int: | |
| """Save detection results to history""" | |
| try: | |
| # Encode image | |
| _, buffer = cv2.imencode('.jpg', image) | |
| image_data = base64.b64encode(buffer).decode('utf-8') | |
| # Serialize data | |
| detections_json = json.dumps(detections) | |
| face_matches_json = json.dumps(face_matches) if face_matches else None | |
| object_matches_json = json.dumps(object_matches) if object_matches else None | |
| confidence_json = json.dumps(confidence_scores) if confidence_scores else None | |
| metadata_json = json.dumps(metadata) if metadata else None | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO detection_history | |
| (session_id, image_data, detections, face_matches, object_matches, | |
| confidence_scores, processing_time, metadata) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
| """, (session_id, image_data, detections_json, face_matches_json, | |
| object_matches_json, confidence_json, processing_time, metadata_json)) | |
| history_id = cursor.lastrowid | |
| conn.commit() | |
| logger.info(f"Saved detection history with ID {history_id}") | |
| return history_id | |
| except Exception as e: | |
| logger.error(f"Failed to save detection history: {e}") | |
| raise | |
| def add_knowledge_entry(self, entity_type: str, entity_id: int, content: str, | |
| keywords: List[str] = None): | |
| """Add entry to knowledge base for RAG""" | |
| try: | |
| keywords_json = json.dumps(keywords) if keywords else None | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO knowledge_base (entity_type, entity_id, content, keywords) | |
| VALUES (?, ?, ?, ?) | |
| """, (entity_type, entity_id, content, keywords_json)) | |
| conn.commit() | |
| except Exception as e: | |
| logger.error(f"Failed to add knowledge entry: {e}") | |
| def search_knowledge(self, query: str, entity_type: str = None) -> List[Dict]: | |
| """Search knowledge base for RAG""" | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| # Simple text search (can be enhanced with embeddings) | |
| search_query = f"%{query.lower()}%" | |
| if entity_type: | |
| cursor.execute(""" | |
| SELECT * FROM knowledge_base | |
| WHERE entity_type = ? AND LOWER(content) LIKE ? | |
| ORDER BY created_at DESC LIMIT 10 | |
| """, (entity_type, search_query)) | |
| else: | |
| cursor.execute(""" | |
| SELECT * FROM knowledge_base | |
| WHERE LOWER(content) LIKE ? | |
| ORDER BY created_at DESC LIMIT 10 | |
| """, (search_query,)) | |
| rows = cursor.fetchall() | |
| results = [] | |
| for row in rows: | |
| result = { | |
| 'id': row[0], | |
| 'entity_type': row[1], | |
| 'entity_id': row[2], | |
| 'content': row[3], | |
| 'keywords': json.loads(row[5]) if row[5] else [], | |
| 'created_at': row[6] | |
| } | |
| results.append(result) | |
| return results | |
| except Exception as e: | |
| logger.error(f"Knowledge search failed: {e}") | |
| return [] | |
| def get_stats(self) -> Dict: | |
| """Get database statistics""" | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| # Count faces | |
| cursor.execute("SELECT COUNT(*) FROM faces WHERE is_active = 1") | |
| face_count = cursor.fetchone()[0] | |
| # Count objects | |
| cursor.execute("SELECT COUNT(*) FROM objects WHERE is_active = 1") | |
| object_count = cursor.fetchone()[0] | |
| # Count history entries | |
| cursor.execute("SELECT COUNT(*) FROM detection_history") | |
| history_count = cursor.fetchone()[0] | |
| # Get recent activity | |
| cursor.execute(""" | |
| SELECT COUNT(*) FROM detection_history | |
| WHERE created_at > datetime('now', '-7 days') | |
| """) | |
| recent_detections = cursor.fetchone()[0] | |
| return { | |
| 'faces': face_count, | |
| 'objects': object_count, | |
| 'total_detections': history_count, | |
| 'recent_detections': recent_detections, | |
| 'database_size': Path(self.db_path).stat().st_size if Path(self.db_path).exists() else 0 | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to get stats: {e}") | |
| return {} | |
| # Training Corrections Methods for Active Learning | |
| def save_correction(self, image_crop: np.ndarray, bbox_coords: List[float], | |
| yolo_prediction: str, yolo_confidence: float, | |
| correct_label: str, user_feedback: str = "", | |
| session_id: str = "") -> bool: | |
| """ | |
| Save a user correction for training | |
| Args: | |
| image_crop: Cropped image of the detected object | |
| bbox_coords: [x1, y1, x2, y2] bounding box coordinates | |
| yolo_prediction: Original YOLO predicted label | |
| yolo_confidence: Original YOLO confidence score | |
| correct_label: User-provided correct label | |
| user_feedback: Optional user feedback text | |
| session_id: Session identifier | |
| Returns: | |
| bool: Success status | |
| """ | |
| try: | |
| # Convert image to bytes | |
| _, buffer = cv2.imencode('.jpg', image_crop) | |
| image_bytes = buffer.tobytes() | |
| # Calculate difficulty score (lower confidence = higher difficulty) | |
| difficulty_score = 1.0 - yolo_confidence | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO training_corrections | |
| (image_crop, bbox_coords, yolo_prediction, yolo_confidence, | |
| correct_label, user_feedback, difficulty_score, session_id, metadata) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| image_bytes, | |
| json.dumps(bbox_coords), | |
| yolo_prediction, | |
| yolo_confidence, | |
| correct_label, | |
| user_feedback, | |
| difficulty_score, | |
| session_id, | |
| json.dumps({ | |
| 'timestamp': datetime.now().isoformat(), | |
| 'image_shape': image_crop.shape, | |
| 'correction_type': 'user_feedback' | |
| }) | |
| )) | |
| # Update or create custom class entry | |
| cursor.execute(""" | |
| INSERT OR IGNORE INTO custom_classes (class_name, yolo_class, sample_count) | |
| VALUES (?, ?, 0) | |
| """, (correct_label, yolo_prediction)) | |
| cursor.execute(""" | |
| UPDATE custom_classes | |
| SET sample_count = sample_count + 1 | |
| WHERE class_name = ? | |
| """, (correct_label,)) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to save correction: {e}") | |
| return False | |
| def get_training_data(self, class_name: str = None, limit: int = 1000, | |
| validated_only: bool = False) -> List[Dict]: | |
| """ | |
| Retrieve training data for model training | |
| Args: | |
| class_name: Filter by specific class (optional) | |
| limit: Maximum number of samples to return | |
| validated_only: Only return validated corrections | |
| Returns: | |
| List of training samples | |
| """ | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| query = """ | |
| SELECT id, image_crop, bbox_coords, yolo_prediction, | |
| yolo_confidence, correct_label, difficulty_score, | |
| created_at, metadata | |
| FROM training_corrections | |
| WHERE 1=1 | |
| """ | |
| params = [] | |
| if class_name: | |
| query += " AND correct_label = ?" | |
| params.append(class_name) | |
| if validated_only: | |
| query += " AND validated = 1" | |
| query += " ORDER BY difficulty_score DESC, created_at DESC LIMIT ?" | |
| params.append(limit) | |
| cursor.execute(query, params) | |
| rows = cursor.fetchall() | |
| training_data = [] | |
| for row in rows: | |
| # Decode image | |
| image_bytes = row[1] | |
| image_array = cv2.imdecode(np.frombuffer(image_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| training_data.append({ | |
| 'id': row[0], | |
| 'image': image_array, | |
| 'bbox_coords': json.loads(row[2]), | |
| 'yolo_prediction': row[3], | |
| 'yolo_confidence': row[4], | |
| 'correct_label': row[5], | |
| 'difficulty_score': row[6], | |
| 'created_at': row[7], | |
| 'metadata': json.loads(row[8]) if row[8] else {} | |
| }) | |
| return training_data | |
| except Exception as e: | |
| logger.error(f"Failed to get training data: {e}") | |
| return [] | |
| def get_training_stats(self) -> Dict: | |
| """Get statistics about training corrections""" | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| # Total corrections | |
| cursor.execute("SELECT COUNT(*) FROM training_corrections") | |
| total_corrections = cursor.fetchone()[0] | |
| # Corrections by class | |
| cursor.execute(""" | |
| SELECT correct_label, COUNT(*) as count | |
| FROM training_corrections | |
| GROUP BY correct_label | |
| ORDER BY count DESC | |
| """) | |
| class_counts = dict(cursor.fetchall()) | |
| # Validated corrections | |
| cursor.execute("SELECT COUNT(*) FROM training_corrections WHERE validated = 1") | |
| validated_count = cursor.fetchone()[0] | |
| # Recent corrections (last 7 days) | |
| cursor.execute(""" | |
| SELECT COUNT(*) FROM training_corrections | |
| WHERE created_at > datetime('now', '-7 days') | |
| """) | |
| recent_corrections = cursor.fetchone()[0] | |
| # Average difficulty score | |
| cursor.execute("SELECT AVG(difficulty_score) FROM training_corrections") | |
| avg_difficulty = cursor.fetchone()[0] or 0.0 | |
| return { | |
| 'total_corrections': total_corrections, | |
| 'validated_corrections': validated_count, | |
| 'recent_corrections': recent_corrections, | |
| 'class_distribution': class_counts, | |
| 'average_difficulty': round(avg_difficulty, 3), | |
| 'unique_classes': len(class_counts) | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to get training stats: {e}") | |
| return {} | |
| def mark_corrections_used(self, correction_ids: List[int]) -> bool: | |
| """Mark corrections as used for training""" | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| placeholders = ','.join(['?'] * len(correction_ids)) | |
| cursor.execute(f""" | |
| UPDATE training_corrections | |
| SET used_for_training = 1 | |
| WHERE id IN ({placeholders}) | |
| """, correction_ids) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to mark corrections as used: {e}") | |
| return False | |
| # Global database instance | |
| try: | |
| db = NAVADADatabase() | |
| logger.info("Database instance created successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to create database instance: {e}") | |
| db = None |